Coverage for faststream / confluent / publisher / usecase.py: 96%

56 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from collections.abc import Iterable 

3from typing import TYPE_CHECKING, Any, Literal, Union, cast, overload 

4 

5from confluent_kafka import Message 

6from typing_extensions import override 

7 

8from faststream._internal.endpoint.publisher import ( 

9 PublisherSpecification, 

10 PublisherUsecase, 

11) 

12from faststream.confluent.response import KafkaPublishCommand 

13from faststream.message import gen_cor_id 

14from faststream.response.publish_type import PublishType 

15 

16if TYPE_CHECKING: 

17 from faststream._internal.basic_types import SendableMessage 

18 from faststream._internal.types import PublisherMiddleware 

19 from faststream.confluent.message import KafkaMessage 

20 from faststream.response.response import PublishCommand 

21 

22 from .config import KafkaPublisherConfig 

23 from .producer import AsyncConfluentFastProducer 

24 

25 

26class LogicPublisher(PublisherUsecase): 

27 """A class to publish messages to a Kafka topic.""" 

28 

29 def __init__( 

30 self, 

31 config: "KafkaPublisherConfig", 

32 specifcication: "PublisherSpecification[Any, Any]", 

33 ) -> None: 

34 super().__init__(config, specifcication) 

35 

36 self._topic = config.topic 

37 self.partition = config.partition 

38 self.reply_to = config.reply_to 

39 self.headers = config.headers or {} 

40 

41 @property 

42 def topic(self) -> str: 

43 return f"{self._outer_config.prefix}{self._topic}" 

44 

45 @override 

46 async def request( 

47 self, 

48 message: "SendableMessage", 

49 topic: str = "", 

50 *, 

51 key: bytes | str | None = None, 

52 partition: int | None = None, 

53 timestamp_ms: int | None = None, 

54 headers: dict[str, str] | None = None, 

55 correlation_id: str | None = None, 

56 timeout: float = 0.5, 

57 ) -> "KafkaMessage": 

58 cmd = KafkaPublishCommand( 

59 message, 

60 topic=topic or self.topic, 

61 key=key, 

62 partition=partition if partition is not None else self.partition, 

63 headers=self.headers | (headers or {}), 

64 correlation_id=correlation_id or gen_cor_id(), 

65 timestamp_ms=timestamp_ms, 

66 timeout=timeout, 

67 _publish_type=PublishType.REQUEST, 

68 ) 

69 

70 msg: KafkaMessage = await self._basic_request( 

71 cmd, 

72 producer=self._outer_config.producer, 

73 ) 

74 return msg 

75 

76 async def flush(self) -> None: 

77 producer = cast("AsyncConfluentFastProducer", self._outer_config.producer) 

78 await producer.flush() 

79 

80 

81class DefaultPublisher(LogicPublisher): 

82 def __init__( 

83 self, 

84 config: "KafkaPublisherConfig", 

85 specifcication: "PublisherSpecification[Any, Any]", 

86 ) -> None: 

87 super().__init__(config, specifcication) 

88 

89 self.key = config.key 

90 

91 @overload 

92 async def publish( 

93 self, 

94 message: "SendableMessage", 

95 topic: str = "", 

96 *, 

97 key: bytes | str | None = None, 

98 partition: int | None = None, 

99 timestamp_ms: int | None = None, 

100 headers: dict[str, str] | None = None, 

101 correlation_id: str | None = None, 

102 reply_to: str = "", 

103 no_confirm: Literal[True] = ..., 

104 ) -> asyncio.Future[Message | None]: ... 

105 

106 @overload 

107 async def publish( 

108 self, 

109 message: "SendableMessage", 

110 topic: str = "", 

111 *, 

112 key: bytes | str | None = None, 

113 partition: int | None = None, 

114 timestamp_ms: int | None = None, 

115 headers: dict[str, str] | None = None, 

116 correlation_id: str | None = None, 

117 reply_to: str = "", 

118 no_confirm: Literal[False] = False, 

119 ) -> Message | None: ... 

120 

121 @overload 

122 async def publish( 

123 self, 

124 message: "SendableMessage", 

125 topic: str = "", 

126 *, 

127 key: bytes | str | None = None, 

128 partition: int | None = None, 

129 timestamp_ms: int | None = None, 

130 headers: dict[str, str] | None = None, 

131 correlation_id: str | None = None, 

132 reply_to: str = "", 

133 no_confirm: bool = False, 

134 ) -> asyncio.Future[Message | None] | Message | None: ... 

135 

136 @override 

137 async def publish( 

138 self, 

139 message: "SendableMessage", 

140 topic: str = "", 

141 *, 

142 key: bytes | str | None = None, 

143 partition: int | None = None, 

144 timestamp_ms: int | None = None, 

145 headers: dict[str, str] | None = None, 

146 correlation_id: str | None = None, 

147 reply_to: str = "", 

148 no_confirm: bool = False, 

149 ) -> asyncio.Future[Message | None] | Message | None: 

150 cmd = KafkaPublishCommand( 

151 message, 

152 topic=topic or self.topic, 

153 key=key or self.key, 

154 partition=partition if partition is not None else self.partition, 

155 reply_to=reply_to or self.reply_to, 

156 headers=self.headers | (headers or {}), 

157 correlation_id=correlation_id or gen_cor_id(), 

158 timestamp_ms=timestamp_ms, 

159 no_confirm=no_confirm, 

160 _publish_type=PublishType.PUBLISH, 

161 ) 

162 msg: asyncio.Future[Message | None] | Message | None = await self._basic_publish( 

163 cmd, 

164 producer=self._outer_config.producer, 

165 _extra_middlewares=(), 

166 ) 

167 return msg 

168 

169 @override 

170 async def _publish( 

171 self, 

172 cmd: Union["PublishCommand", "KafkaPublishCommand"], 

173 *, 

174 _extra_middlewares: Iterable["PublisherMiddleware"], 

175 ) -> None: 

176 """This method should be called in subscriber flow only.""" 

177 cmd = KafkaPublishCommand.from_cmd(cmd) 

178 

179 cmd.destination = self.topic 

180 cmd.add_headers(self.headers, override=False) 

181 cmd.reply_to = cmd.reply_to or self.reply_to 

182 

183 cmd.partition = cmd.partition if cmd.partition is not None else self.partition 

184 cmd.key = cmd.key or self.key 

185 

186 await self._basic_publish( 

187 cmd, 

188 producer=self._outer_config.producer, 

189 _extra_middlewares=_extra_middlewares, 

190 ) 

191 

192 @override 

193 async def request( 

194 self, 

195 message: "SendableMessage", 

196 topic: str = "", 

197 *, 

198 key: bytes | str | None = None, 

199 partition: int | None = None, 

200 timestamp_ms: int | None = None, 

201 headers: dict[str, str] | None = None, 

202 correlation_id: str | None = None, 

203 timeout: float = 0.5, 

204 ) -> "KafkaMessage": 

205 return await super().request( 

206 message, 

207 topic=topic, 

208 key=key or self.key, 

209 partition=partition, 

210 timestamp_ms=timestamp_ms, 

211 headers=headers, 

212 correlation_id=correlation_id, 

213 timeout=timeout, 

214 ) 

215 

216 

217class BatchPublisher(LogicPublisher): 

218 def __init__( 

219 self, 

220 config: "KafkaPublisherConfig", 

221 specification: "PublisherSpecification[Any, Any]", 

222 ) -> None: 

223 super().__init__(config, specification) 

224 self.key = config.key 

225 

226 @override 

227 async def publish( 

228 self, 

229 *messages: "SendableMessage", 

230 topic: str = "", 

231 key: bytes | str | None = None, 

232 partition: int | None = None, 

233 timestamp_ms: int | None = None, 

234 headers: dict[str, str] | None = None, 

235 correlation_id: str | None = None, 

236 reply_to: str = "", 

237 no_confirm: bool = False, 

238 ) -> None: 

239 cmd = KafkaPublishCommand( 

240 *messages, 

241 key=key or self.key, 

242 topic=topic or self.topic, 

243 partition=partition if partition is not None else self.partition, 

244 reply_to=reply_to or self.reply_to, 

245 headers=self.headers | (headers or {}), 

246 correlation_id=correlation_id or gen_cor_id(), 

247 timestamp_ms=timestamp_ms, 

248 no_confirm=no_confirm, 

249 _publish_type=PublishType.PUBLISH, 

250 ) 

251 

252 await self._basic_publish_batch( 

253 cmd, 

254 producer=self._outer_config.producer, 

255 _extra_middlewares=(), 

256 ) 

257 

258 @override 

259 async def _publish( 

260 self, 

261 cmd: Union["PublishCommand", "KafkaPublishCommand"], 

262 *, 

263 _extra_middlewares: Iterable["PublisherMiddleware"], 

264 ) -> None: 

265 """This method should be called in subscriber flow only.""" 

266 cmd = KafkaPublishCommand.from_cmd(cmd, batch=True) 

267 

268 cmd.destination = self.topic 

269 cmd.add_headers(self.headers, override=False) 

270 cmd.reply_to = cmd.reply_to or self.reply_to 

271 

272 cmd.partition = cmd.partition if cmd.partition is not None else self.partition 

273 cmd.key = cmd.key or self.key 

274 

275 await self._basic_publish_batch( 

276 cmd, 

277 producer=self._outer_config.producer, 

278 _extra_middlewares=_extra_middlewares, 

279 )