Coverage for faststream / _internal / broker / pub_base.py: 93%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from collections.abc import Sequence
3from functools import partial
4from typing import TYPE_CHECKING, Any, Generic
6from faststream._internal.endpoint.utils import process_msg
7from faststream._internal.types import MsgType
8from faststream.exceptions import FeatureNotSupportedException
9from faststream.message.source_type import SourceType
11if TYPE_CHECKING:
12 from faststream._internal.basic_types import SendableMessage
13 from faststream._internal.context import ContextRepo
14 from faststream._internal.producer import ProducerProto
15 from faststream._internal.types import BrokerMiddleware
16 from faststream.response import PublishCommand
19class BrokerPublishMixin(Generic[MsgType]):
20 @property
21 @abstractmethod
22 def middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]:
23 raise NotImplementedError
25 @property
26 @abstractmethod
27 def context(self) -> "ContextRepo":
28 raise NotImplementedError
30 @abstractmethod
31 async def publish(
32 self,
33 message: "SendableMessage",
34 queue: str,
35 /,
36 ) -> Any:
37 raise NotImplementedError
39 async def _basic_publish(
40 self,
41 cmd: "PublishCommand",
42 *,
43 producer: "ProducerProto[Any]",
44 ) -> Any:
45 publish = producer.publish
46 context = self.context # caches property
48 for m in self.middlewares[::-1]:
49 publish = partial(m(None, context=context).publish_scope, publish)
51 return await publish(cmd)
53 async def publish_batch(
54 self,
55 *messages: "SendableMessage",
56 queue: str,
57 ) -> Any:
58 msg = f"{self.__class__.__name__} doesn't support publishing in batches."
59 raise FeatureNotSupportedException(msg)
61 async def _basic_publish_batch(
62 self,
63 cmd: "PublishCommand",
64 *,
65 producer: "ProducerProto[Any]",
66 ) -> Any:
67 publish = producer.publish_batch
68 context = self.context # caches property
70 for m in self.middlewares[::-1]:
71 publish = partial(m(None, context=context).publish_scope, publish)
73 return await publish(cmd)
75 @abstractmethod
76 async def request(
77 self,
78 message: "SendableMessage",
79 queue: str,
80 /,
81 timeout: float = 0.5,
82 ) -> Any:
83 raise NotImplementedError
85 async def _basic_request(
86 self,
87 cmd: "PublishCommand",
88 *,
89 producer: "ProducerProto[Any]",
90 ) -> Any:
91 request = producer.request
92 context = self.context # caches property
94 for m in self.middlewares[::-1]:
95 request = partial(m(None, context=context).publish_scope, request)
97 published_msg = await request(cmd)
99 response_msg: Any = await process_msg(
100 msg=published_msg,
101 middlewares=(m(published_msg, context=context) for m in self.middlewares),
102 parser=producer._parser,
103 decoder=producer._decoder,
104 source_type=SourceType.RESPONSE,
105 )
106 return response_msg