Coverage for faststream / kafka / publisher / usecase.py: 96%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable 

2from typing import TYPE_CHECKING, Any, Literal, Union, cast, overload 

3 

4from typing_extensions import override 

5 

6from faststream._internal.endpoint.publisher import PublisherUsecase 

7from faststream.kafka.response import KafkaPublishCommand 

8from faststream.message import gen_cor_id 

9from faststream.response.publish_type import PublishType 

10 

11if TYPE_CHECKING: 

12 import asyncio 

13 

14 from aiokafka.structs import RecordMetadata 

15 

16 from faststream._internal.basic_types import SendableMessage 

17 from faststream._internal.endpoint.publisher import PublisherSpecification 

18 from faststream._internal.types import PublisherMiddleware 

19 from faststream.kafka.message import KafkaMessage 

20 from faststream.response.response import PublishCommand 

21 

22 from .config import KafkaPublisherConfig 

23 from .producer import AioKafkaFastProducer 

24 

25 

26class LogicPublisher(PublisherUsecase): 

27 """A class to publish messages to a Kafka topic.""" 

28 

29 def __init__( 

30 self, 

31 config: "KafkaPublisherConfig", 

32 specification: "PublisherSpecification[Any, Any]", 

33 ) -> None: 

34 super().__init__(config, specification) 

35 

36 self._topic = config.topic 

37 self.partition = config.partition 

38 self.reply_to = config.reply_to 

39 self.headers = config.headers or {} 

40 

41 @property 

42 def topic(self) -> str: 

43 return f"{self._outer_config.prefix}{self._topic}" 

44 

45 @override 

46 async def request( 

47 self, 

48 message: "SendableMessage", 

49 topic: str = "", 

50 *, 

51 key: bytes | Any | None = None, 

52 partition: int | None = None, 

53 timestamp_ms: int | None = None, 

54 headers: dict[str, str] | None = None, 

55 correlation_id: str | None = None, 

56 timeout: float = 0.5, 

57 ) -> "KafkaMessage": 

58 """Send a request message to Kafka topic. 

59 

60 Args: 

61 message: Message body to send. 

62 topic: Topic where the message will be published. 

63 key: A key to associate with the message. Can be used to 

64 determine which partition to send the message to. If partition 

65 is `None` (and producer's partitioner config is left as default), 

66 then messages with the same key will be delivered to the same 

67 partition (but if key is `None`, partition is chosen randomly). 

68 Must be type `bytes`, or be serializable to bytes via configured 

69 `key_serializer`. 

70 partition: Specify a partition. If not set, the partition will be 

71 selected using the configured `partitioner`. 

72 timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as 

73 the message timestamp. Defaults to current time. 

74 headers: Message headers to store metainformation. 

75 correlation_id: Manual message **correlation_id** setter. 

76 **correlation_id** is a useful option to trace messages. 

77 timeout: Timeout to send RPC request. 

78 

79 Returns: 

80 KafkaMessage: The response message. 

81 """ 

82 cmd = KafkaPublishCommand( 

83 message, 

84 topic=topic or self.topic, 

85 key=key, 

86 partition=partition if partition is not None else self.partition, 

87 headers=self.headers | (headers or {}), 

88 correlation_id=correlation_id or gen_cor_id(), 

89 timestamp_ms=timestamp_ms, 

90 timeout=timeout, 

91 _publish_type=PublishType.REQUEST, 

92 ) 

93 

94 msg: KafkaMessage = await self._basic_request( 

95 cmd, 

96 producer=self._outer_config.producer, 

97 ) 

98 return msg 

99 

100 async def flush(self) -> None: 

101 producer = cast("AioKafkaFastProducer", self._outer_config.producer) 

102 await producer.flush() 

103 

104 

105class DefaultPublisher(LogicPublisher): 

106 def __init__( 

107 self, 

108 config: "KafkaPublisherConfig", 

109 specification: "PublisherSpecification[Any, Any]", 

110 ) -> None: 

111 super().__init__(config, specification) 

112 

113 self.key = config.key 

114 

115 @overload 

116 async def publish( 

117 self, 

118 message: "SendableMessage", 

119 topic: str = "", 

120 *, 

121 key: bytes | Any | None = None, 

122 partition: int | None = None, 

123 timestamp_ms: int | None = None, 

124 headers: dict[str, str] | None = None, 

125 correlation_id: str | None = None, 

126 reply_to: str = "", 

127 no_confirm: Literal[False] = False, 

128 ) -> "RecordMetadata": ... 

129 

130 @overload 

131 async def publish( 

132 self, 

133 message: "SendableMessage", 

134 topic: str = "", 

135 *, 

136 key: bytes | Any | None = None, 

137 partition: int | None = None, 

138 timestamp_ms: int | None = None, 

139 headers: dict[str, str] | None = None, 

140 correlation_id: str | None = None, 

141 reply_to: str = "", 

142 no_confirm: Literal[True] = ..., 

143 ) -> "asyncio.Future[RecordMetadata]": ... 

144 

145 @overload 

146 async def publish( 

147 self, 

148 message: "SendableMessage", 

149 topic: str = "", 

150 *, 

151 key: bytes | Any | None = None, 

152 partition: int | None = None, 

153 timestamp_ms: int | None = None, 

154 headers: dict[str, str] | None = None, 

155 correlation_id: str | None = None, 

156 reply_to: str = "", 

157 no_confirm: bool = False, 

158 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ... 

159 

160 @override 

161 async def publish( 

162 self, 

163 message: "SendableMessage", 

164 topic: str = "", 

165 *, 

166 key: bytes | Any | None = None, 

167 partition: int | None = None, 

168 timestamp_ms: int | None = None, 

169 headers: dict[str, str] | None = None, 

170 correlation_id: str | None = None, 

171 reply_to: str = "", 

172 no_confirm: bool = False, 

173 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

174 """Publishes a message to Kafka. 

175 

176 Args: 

177 message: 

178 Message body to send. 

179 topic: 

180 Topic where the message will be published. 

181 key: 

182 A key to associate with the message. Can be used to 

183 determine which partition to send the message to. If partition 

184 is `None` (and producer's partitioner config is left as default), 

185 then messages with the same key will be delivered to the same 

186 partition (but if key is `None`, partition is chosen randomly). 

187 Must be type `bytes`, or be serializable to bytes via configured 

188 `key_serializer` 

189 partition: 

190 Specify a partition. If not set, the partition will be 

191 selected using the configured `partitioner` 

192 timestamp_ms: 

193 Epoch milliseconds (from Jan 1 1970 UTC) to use as 

194 the message timestamp. Defaults to current time. 

195 headers: 

196 Message headers to store metainformation. 

197 correlation_id: 

198 Manual message **correlation_id** setter. 

199 **correlation_id** is a useful option to trace messages. 

200 reply_to: 

201 Reply message topic name to send response. 

202 no_confirm: 

203 Do not wait for Kafka publish confirmation. 

204 

205 Returns: 

206 `asyncio.Future[RecordMetadata]` if no_confirm = True. 

207 `RecordMetadata` if no_confirm = False. 

208 """ 

209 cmd = KafkaPublishCommand( 

210 message, 

211 topic=topic or self.topic, 

212 key=key or self.key, 

213 partition=partition if partition is not None else self.partition, 

214 reply_to=reply_to or self.reply_to, 

215 headers=self.headers | (headers or {}), 

216 correlation_id=correlation_id or gen_cor_id(), 

217 timestamp_ms=timestamp_ms, 

218 no_confirm=no_confirm, 

219 _publish_type=PublishType.PUBLISH, 

220 ) 

221 return await self._basic_publish( 

222 cmd, 

223 producer=self._outer_config.producer, 

224 _extra_middlewares=(), 

225 ) 

226 

227 @override 

228 async def _publish( 

229 self, 

230 cmd: Union["PublishCommand", "KafkaPublishCommand"], 

231 *, 

232 _extra_middlewares: Iterable["PublisherMiddleware"], 

233 ) -> None: 

234 """This method should be called in subscriber flow only.""" 

235 cmd = KafkaPublishCommand.from_cmd(cmd) 

236 

237 cmd.destination = self.topic 

238 cmd.add_headers(self.headers, override=False) 

239 cmd.reply_to = cmd.reply_to or self.reply_to 

240 

241 cmd.partition = cmd.partition if cmd.partition is not None else self.partition 

242 cmd.key = cmd.key or self.key 

243 

244 await self._basic_publish( 

245 cmd, 

246 producer=self._outer_config.producer, 

247 _extra_middlewares=_extra_middlewares, 

248 ) 

249 

250 @override 

251 async def request( 

252 self, 

253 message: "SendableMessage", 

254 topic: str = "", 

255 *, 

256 key: bytes | Any | None = None, 

257 partition: int | None = None, 

258 timestamp_ms: int | None = None, 

259 headers: dict[str, str] | None = None, 

260 correlation_id: str | None = None, 

261 timeout: float = 0.5, 

262 ) -> "KafkaMessage": 

263 """Send a request message and wait for a response. 

264 

265 Args: 

266 message: Message body to send. 

267 topic: Topic where the message will be published. 

268 key: A key to associate with the message. Can be used to 

269 determine which partition to send the message to. If partition 

270 is `None` (and producer's partitioner config is left as default), 

271 then messages with the same key will be delivered to the same 

272 partition (but if key is `None`, partition is chosen randomly). 

273 Must be type `bytes`, or be serializable to bytes via configured 

274 `key_serializer`. 

275 partition: Specify a partition. If not set, the partition will be 

276 selected using the configured `partitioner`. 

277 timestamp_ms: Epoch milliseconds (from Jan 1 1970 UTC) to use as 

278 the message timestamp. Defaults to current time. 

279 headers: Message headers to store metainformation. 

280 correlation_id: Manual message **correlation_id** setter. 

281 **correlation_id** is a useful option to trace messages. 

282 timeout: Timeout to send RPC request. 

283 

284 Returns: 

285 The response message. 

286 """ 

287 return await super().request( 

288 message, 

289 topic=topic, 

290 key=key or self.key, 

291 partition=partition, 

292 timestamp_ms=timestamp_ms, 

293 headers=headers, 

294 correlation_id=correlation_id, 

295 timeout=timeout, 

296 ) 

297 

298 

299class BatchPublisher(LogicPublisher): 

300 def __init__( 

301 self, 

302 config: "KafkaPublisherConfig", 

303 specification: "PublisherSpecification[Any, Any]", 

304 ) -> None: 

305 super().__init__(config, specification) 

306 self.key = config.key 

307 

308 @overload 

309 async def publish( 

310 self, 

311 *messages: "SendableMessage", 

312 topic: str = "", 

313 key: bytes | Any | None = None, 

314 partition: int | None = None, 

315 timestamp_ms: int | None = None, 

316 headers: dict[str, str] | None = None, 

317 reply_to: str = "", 

318 correlation_id: str | None = None, 

319 no_confirm: Literal[False] = False, 

320 ) -> "RecordMetadata": ... 

321 

322 @overload 

323 async def publish( 

324 self, 

325 *messages: "SendableMessage", 

326 topic: str = "", 

327 key: bytes | Any | None = None, 

328 partition: int | None = None, 

329 timestamp_ms: int | None = None, 

330 headers: dict[str, str] | None = None, 

331 reply_to: str = "", 

332 correlation_id: str | None = None, 

333 no_confirm: Literal[True] = ..., 

334 ) -> "asyncio.Future[RecordMetadata]": ... 

335 

336 @overload 

337 async def publish( 

338 self, 

339 *messages: "SendableMessage", 

340 topic: str = "", 

341 key: bytes | Any | None = None, 

342 partition: int | None = None, 

343 timestamp_ms: int | None = None, 

344 headers: dict[str, str] | None = None, 

345 reply_to: str = "", 

346 correlation_id: str | None = None, 

347 no_confirm: bool = False, 

348 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ... 

349 

350 @override 

351 async def publish( 

352 self, 

353 *messages: "SendableMessage", 

354 topic: str = "", 

355 key: bytes | Any | None = None, 

356 partition: int | None = None, 

357 timestamp_ms: int | None = None, 

358 headers: dict[str, str] | None = None, 

359 reply_to: str = "", 

360 correlation_id: str | None = None, 

361 no_confirm: bool = False, 

362 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

363 """Publish a message batch as a single request to broker. 

364 

365 Args: 

366 *messages: 

367 Messages bodies to send. 

368 topic: 

369 Topic where the message will be published. 

370 key: 

371 A single key to associate with every message in this batch. If a 

372 partition is not specified and the producer uses the default 

373 partitioner, messages with the same key will be routed to the 

374 same partition. Must be bytes or serializable to bytes via the 

375 configured key serializer. If omitted, falls back to the 

376 publisher's default key (if configured). 

377 partition: 

378 Specify a partition. If not set, the partition will be 

379 selected using the configured `partitioner` 

380 timestamp_ms: 

381 Epoch milliseconds (from Jan 1 1970 UTC) to use as 

382 the message timestamp. Defaults to current time. 

383 headers: 

384 Message headers to store metainformation. 

385 reply_to: 

386 Reply message topic name to send response. 

387 correlation_id: 

388 Manual message **correlation_id** setter. 

389 **correlation_id** is a useful option to trace messages. 

390 no_confirm: 

391 Do not wait for Kafka publish confirmation. 

392 

393 Returns: 

394 `asyncio.Future[RecordMetadata]` if no_confirm = True. 

395 `RecordMetadata` if no_confirm = False. 

396 """ 

397 cmd = KafkaPublishCommand( 

398 *messages, 

399 key=key or self.key, 

400 topic=topic or self.topic, 

401 partition=partition if partition is not None else self.partition, 

402 reply_to=reply_to or self.reply_to, 

403 headers=self.headers | (headers or {}), 

404 correlation_id=correlation_id or gen_cor_id(), 

405 timestamp_ms=timestamp_ms, 

406 no_confirm=no_confirm, 

407 _publish_type=PublishType.PUBLISH, 

408 ) 

409 

410 return await self._basic_publish_batch( 

411 cmd, 

412 producer=self._outer_config.producer, 

413 _extra_middlewares=(), 

414 ) 

415 

416 @override 

417 async def _publish( 

418 self, 

419 cmd: Union["PublishCommand", "KafkaPublishCommand"], 

420 *, 

421 _extra_middlewares: Iterable["PublisherMiddleware"], 

422 ) -> None: 

423 """This method should be called in subscriber flow only.""" 

424 cmd = KafkaPublishCommand.from_cmd(cmd, batch=True) 

425 

426 cmd.destination = self.topic 

427 cmd.add_headers(self.headers, override=False) 

428 cmd.reply_to = cmd.reply_to or self.reply_to 

429 

430 cmd.partition = cmd.partition if cmd.partition is not None else self.partition 

431 cmd.key = cmd.key or self.key 

432 

433 await self._basic_publish_batch( 

434 cmd, 

435 producer=self._outer_config.producer, 

436 _extra_middlewares=_extra_middlewares, 

437 )