Coverage for faststream / _internal / broker / pub_base.py: 93%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Sequence 

3from functools import partial 

4from typing import TYPE_CHECKING, Any, Generic 

5 

6from faststream._internal.endpoint.utils import process_msg 

7from faststream._internal.types import MsgType 

8from faststream.exceptions import FeatureNotSupportedException 

9from faststream.message.source_type import SourceType 

10 

11if TYPE_CHECKING: 

12 from faststream._internal.basic_types import SendableMessage 

13 from faststream._internal.context import ContextRepo 

14 from faststream._internal.producer import ProducerProto 

15 from faststream._internal.types import BrokerMiddleware 

16 from faststream.response import PublishCommand 

17 

18 

19class BrokerPublishMixin(Generic[MsgType]): 

20 @property 

21 @abstractmethod 

22 def middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]: 

23 raise NotImplementedError 

24 

25 @property 

26 @abstractmethod 

27 def context(self) -> "ContextRepo": 

28 raise NotImplementedError 

29 

30 @abstractmethod 

31 async def publish( 

32 self, 

33 message: "SendableMessage", 

34 queue: str, 

35 /, 

36 ) -> Any: 

37 raise NotImplementedError 

38 

39 async def _basic_publish( 

40 self, 

41 cmd: "PublishCommand", 

42 *, 

43 producer: "ProducerProto[Any]", 

44 ) -> Any: 

45 publish = producer.publish 

46 context = self.context # caches property 

47 

48 for m in self.middlewares[::-1]: 

49 publish = partial(m(None, context=context).publish_scope, publish) 

50 

51 return await publish(cmd) 

52 

53 async def publish_batch( 

54 self, 

55 *messages: "SendableMessage", 

56 queue: str, 

57 ) -> Any: 

58 msg = f"{self.__class__.__name__} doesn't support publishing in batches." 

59 raise FeatureNotSupportedException(msg) 

60 

61 async def _basic_publish_batch( 

62 self, 

63 cmd: "PublishCommand", 

64 *, 

65 producer: "ProducerProto[Any]", 

66 ) -> Any: 

67 publish = producer.publish_batch 

68 context = self.context # caches property 

69 

70 for m in self.middlewares[::-1]: 

71 publish = partial(m(None, context=context).publish_scope, publish) 

72 

73 return await publish(cmd) 

74 

75 @abstractmethod 

76 async def request( 

77 self, 

78 message: "SendableMessage", 

79 queue: str, 

80 /, 

81 timeout: float = 0.5, 

82 ) -> Any: 

83 raise NotImplementedError 

84 

85 async def _basic_request( 

86 self, 

87 cmd: "PublishCommand", 

88 *, 

89 producer: "ProducerProto[Any]", 

90 ) -> Any: 

91 request = producer.request 

92 context = self.context # caches property 

93 

94 for m in self.middlewares[::-1]: 

95 request = partial(m(None, context=context).publish_scope, request) 

96 

97 published_msg = await request(cmd) 

98 

99 response_msg: Any = await process_msg( 

100 msg=published_msg, 

101 middlewares=(m(published_msg, context=context) for m in self.middlewares), 

102 parser=producer._parser, 

103 decoder=producer._decoder, 

104 source_type=SourceType.RESPONSE, 

105 ) 

106 return response_msg