Coverage for faststream / confluent / publisher / usecase.py: 96%
56 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from collections.abc import Iterable
3from typing import TYPE_CHECKING, Any, Literal, Union, cast, overload
5from confluent_kafka import Message
6from typing_extensions import override
8from faststream._internal.endpoint.publisher import (
9 PublisherSpecification,
10 PublisherUsecase,
11)
12from faststream.confluent.response import KafkaPublishCommand
13from faststream.message import gen_cor_id
14from faststream.response.publish_type import PublishType
16if TYPE_CHECKING:
17 from faststream._internal.basic_types import SendableMessage
18 from faststream._internal.types import PublisherMiddleware
19 from faststream.confluent.message import KafkaMessage
20 from faststream.response.response import PublishCommand
22 from .config import KafkaPublisherConfig
23 from .producer import AsyncConfluentFastProducer
26class LogicPublisher(PublisherUsecase):
27 """A class to publish messages to a Kafka topic."""
29 def __init__(
30 self,
31 config: "KafkaPublisherConfig",
32 specifcication: "PublisherSpecification[Any, Any]",
33 ) -> None:
34 super().__init__(config, specifcication)
36 self._topic = config.topic
37 self.partition = config.partition
38 self.reply_to = config.reply_to
39 self.headers = config.headers or {}
41 @property
42 def topic(self) -> str:
43 return f"{self._outer_config.prefix}{self._topic}"
45 @override
46 async def request(
47 self,
48 message: "SendableMessage",
49 topic: str = "",
50 *,
51 key: bytes | str | None = None,
52 partition: int | None = None,
53 timestamp_ms: int | None = None,
54 headers: dict[str, str] | None = None,
55 correlation_id: str | None = None,
56 timeout: float = 0.5,
57 ) -> "KafkaMessage":
58 cmd = KafkaPublishCommand(
59 message,
60 topic=topic or self.topic,
61 key=key,
62 partition=partition if partition is not None else self.partition,
63 headers=self.headers | (headers or {}),
64 correlation_id=correlation_id or gen_cor_id(),
65 timestamp_ms=timestamp_ms,
66 timeout=timeout,
67 _publish_type=PublishType.REQUEST,
68 )
70 msg: KafkaMessage = await self._basic_request(
71 cmd,
72 producer=self._outer_config.producer,
73 )
74 return msg
76 async def flush(self) -> None:
77 producer = cast("AsyncConfluentFastProducer", self._outer_config.producer)
78 await producer.flush()
81class DefaultPublisher(LogicPublisher):
82 def __init__(
83 self,
84 config: "KafkaPublisherConfig",
85 specifcication: "PublisherSpecification[Any, Any]",
86 ) -> None:
87 super().__init__(config, specifcication)
89 self.key = config.key
91 @overload
92 async def publish(
93 self,
94 message: "SendableMessage",
95 topic: str = "",
96 *,
97 key: bytes | str | None = None,
98 partition: int | None = None,
99 timestamp_ms: int | None = None,
100 headers: dict[str, str] | None = None,
101 correlation_id: str | None = None,
102 reply_to: str = "",
103 no_confirm: Literal[True] = ...,
104 ) -> asyncio.Future[Message | None]: ...
106 @overload
107 async def publish(
108 self,
109 message: "SendableMessage",
110 topic: str = "",
111 *,
112 key: bytes | str | None = None,
113 partition: int | None = None,
114 timestamp_ms: int | None = None,
115 headers: dict[str, str] | None = None,
116 correlation_id: str | None = None,
117 reply_to: str = "",
118 no_confirm: Literal[False] = False,
119 ) -> Message | None: ...
121 @overload
122 async def publish(
123 self,
124 message: "SendableMessage",
125 topic: str = "",
126 *,
127 key: bytes | str | None = None,
128 partition: int | None = None,
129 timestamp_ms: int | None = None,
130 headers: dict[str, str] | None = None,
131 correlation_id: str | None = None,
132 reply_to: str = "",
133 no_confirm: bool = False,
134 ) -> asyncio.Future[Message | None] | Message | None: ...
136 @override
137 async def publish(
138 self,
139 message: "SendableMessage",
140 topic: str = "",
141 *,
142 key: bytes | str | None = None,
143 partition: int | None = None,
144 timestamp_ms: int | None = None,
145 headers: dict[str, str] | None = None,
146 correlation_id: str | None = None,
147 reply_to: str = "",
148 no_confirm: bool = False,
149 ) -> asyncio.Future[Message | None] | Message | None:
150 cmd = KafkaPublishCommand(
151 message,
152 topic=topic or self.topic,
153 key=key or self.key,
154 partition=partition if partition is not None else self.partition,
155 reply_to=reply_to or self.reply_to,
156 headers=self.headers | (headers or {}),
157 correlation_id=correlation_id or gen_cor_id(),
158 timestamp_ms=timestamp_ms,
159 no_confirm=no_confirm,
160 _publish_type=PublishType.PUBLISH,
161 )
162 msg: asyncio.Future[Message | None] | Message | None = await self._basic_publish(
163 cmd,
164 producer=self._outer_config.producer,
165 _extra_middlewares=(),
166 )
167 return msg
169 @override
170 async def _publish(
171 self,
172 cmd: Union["PublishCommand", "KafkaPublishCommand"],
173 *,
174 _extra_middlewares: Iterable["PublisherMiddleware"],
175 ) -> None:
176 """This method should be called in subscriber flow only."""
177 cmd = KafkaPublishCommand.from_cmd(cmd)
179 cmd.destination = self.topic
180 cmd.add_headers(self.headers, override=False)
181 cmd.reply_to = cmd.reply_to or self.reply_to
183 cmd.partition = cmd.partition if cmd.partition is not None else self.partition
184 cmd.key = cmd.key or self.key
186 await self._basic_publish(
187 cmd,
188 producer=self._outer_config.producer,
189 _extra_middlewares=_extra_middlewares,
190 )
192 @override
193 async def request(
194 self,
195 message: "SendableMessage",
196 topic: str = "",
197 *,
198 key: bytes | str | None = None,
199 partition: int | None = None,
200 timestamp_ms: int | None = None,
201 headers: dict[str, str] | None = None,
202 correlation_id: str | None = None,
203 timeout: float = 0.5,
204 ) -> "KafkaMessage":
205 return await super().request(
206 message,
207 topic=topic,
208 key=key or self.key,
209 partition=partition,
210 timestamp_ms=timestamp_ms,
211 headers=headers,
212 correlation_id=correlation_id,
213 timeout=timeout,
214 )
217class BatchPublisher(LogicPublisher):
218 def __init__(
219 self,
220 config: "KafkaPublisherConfig",
221 specification: "PublisherSpecification[Any, Any]",
222 ) -> None:
223 super().__init__(config, specification)
224 self.key = config.key
226 @override
227 async def publish(
228 self,
229 *messages: "SendableMessage",
230 topic: str = "",
231 key: bytes | str | None = None,
232 partition: int | None = None,
233 timestamp_ms: int | None = None,
234 headers: dict[str, str] | None = None,
235 correlation_id: str | None = None,
236 reply_to: str = "",
237 no_confirm: bool = False,
238 ) -> None:
239 cmd = KafkaPublishCommand(
240 *messages,
241 key=key or self.key,
242 topic=topic or self.topic,
243 partition=partition if partition is not None else self.partition,
244 reply_to=reply_to or self.reply_to,
245 headers=self.headers | (headers or {}),
246 correlation_id=correlation_id or gen_cor_id(),
247 timestamp_ms=timestamp_ms,
248 no_confirm=no_confirm,
249 _publish_type=PublishType.PUBLISH,
250 )
252 await self._basic_publish_batch(
253 cmd,
254 producer=self._outer_config.producer,
255 _extra_middlewares=(),
256 )
258 @override
259 async def _publish(
260 self,
261 cmd: Union["PublishCommand", "KafkaPublishCommand"],
262 *,
263 _extra_middlewares: Iterable["PublisherMiddleware"],
264 ) -> None:
265 """This method should be called in subscriber flow only."""
266 cmd = KafkaPublishCommand.from_cmd(cmd, batch=True)
268 cmd.destination = self.topic
269 cmd.add_headers(self.headers, override=False)
270 cmd.reply_to = cmd.reply_to or self.reply_to
272 cmd.partition = cmd.partition if cmd.partition is not None else self.partition
273 cmd.key = cmd.key or self.key
275 await self._basic_publish_batch(
276 cmd,
277 producer=self._outer_config.producer,
278 _extra_middlewares=_extra_middlewares,
279 )