Coverage for faststream / redis / publisher / usecase.py: 92%

106 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Iterable 

3from typing import TYPE_CHECKING, Any, Optional, Union 

4 

5from typing_extensions import override 

6 

7from faststream._internal.endpoint.publisher import ( 

8 PublisherSpecification, 

9 PublisherUsecase, 

10) 

11from faststream.message import gen_cor_id 

12from faststream.redis.response import RedisPublishCommand 

13from faststream.response.publish_type import PublishType 

14 

15from .producer import RedisFastProducer 

16 

17if TYPE_CHECKING: 

18 from redis.asyncio.client import Pipeline 

19 

20 from faststream._internal.basic_types import SendableMessage 

21 from faststream._internal.types import PublisherMiddleware 

22 from faststream.redis.message import RedisChannelMessage 

23 from faststream.redis.schemas import ListSub, PubSub, StreamSub 

24 from faststream.response import PublishCommand 

25 

26 from .config import RedisPublisherConfig 

27 

28 

29class LogicPublisher(PublisherUsecase): 

30 """A class to represent a Redis publisher.""" 

31 

32 def __init__( 

33 self, 

34 config: "RedisPublisherConfig", 

35 specification: "PublisherSpecification[Any, Any]", 

36 ) -> None: 

37 super().__init__(config, specification) 

38 

39 self.config = config 

40 

41 self.reply_to = config.reply_to 

42 self.headers = config.headers or {} 

43 

44 self.producer = self.config._outer_config.producer 

45 

46 async def start(self) -> None: 

47 await super().start() 

48 

49 broker_producer = self.config._outer_config.producer 

50 

51 self.producer = RedisFastProducer( 

52 connection=self.config._outer_config.connection, 

53 parser=broker_producer._parser.custom_func, 

54 decoder=broker_producer._decoder.custom_func, 

55 message_format=self.config.message_format, 

56 serializer=self.config._outer_config.fd_config._serializer, 

57 ) 

58 

59 @abstractmethod 

60 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]: 

61 raise NotImplementedError 

62 

63 

64class ChannelPublisher(LogicPublisher): 

65 def __init__( 

66 self, 

67 config: "RedisPublisherConfig", 

68 specification: "PublisherSpecification[Any, Any]", 

69 *, 

70 channel: "PubSub", 

71 ) -> None: 

72 super().__init__(config, specification) 

73 

74 self._channel = channel 

75 

76 @property 

77 def channel(self) -> "PubSub": 

78 return self._channel.add_prefix(self._outer_config.prefix) 

79 

80 @override 

81 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]: 

82 return { 

83 "channel": self.channel.name if name_only else self.channel, 

84 "list": None, 

85 "stream": None, 

86 } 

87 

88 @override 

89 async def publish( 

90 self, 

91 message: "SendableMessage" = None, 

92 channel: str | None = None, 

93 reply_to: str = "", 

94 headers: dict[str, Any] | None = None, 

95 correlation_id: str | None = None, 

96 *, 

97 pipeline: Optional["Pipeline[bytes]"] = None, 

98 ) -> int: 

99 cmd = RedisPublishCommand( 

100 message, 

101 channel=channel or self.channel.name, 

102 reply_to=reply_to or self.reply_to, 

103 headers=self.headers | (headers or {}), 

104 correlation_id=correlation_id or gen_cor_id(), 

105 pipeline=pipeline, 

106 _publish_type=PublishType.PUBLISH, 

107 message_format=self.config.message_format, 

108 ) 

109 result: int = await self._basic_publish( 

110 cmd, 

111 producer=self.producer, 

112 _extra_middlewares=(), 

113 ) 

114 return result 

115 

116 @override 

117 async def _publish( 

118 self, 

119 cmd: Union["PublishCommand", "RedisPublishCommand"], 

120 *, 

121 _extra_middlewares: Iterable["PublisherMiddleware"], 

122 ) -> None: 

123 """This method should be called in subscriber flow only.""" 

124 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format) 

125 

126 cmd.set_destination(channel=self.channel.name) 

127 

128 cmd.add_headers(self.headers, override=False) 

129 cmd.reply_to = cmd.reply_to or self.reply_to 

130 

131 await self._basic_publish( 

132 cmd, 

133 producer=self.producer, 

134 _extra_middlewares=_extra_middlewares, 

135 ) 

136 

137 @override 

138 async def request( 

139 self, 

140 message: "SendableMessage" = None, 

141 channel: str | None = None, 

142 *, 

143 correlation_id: str | None = None, 

144 headers: dict[str, Any] | None = None, 

145 timeout: float | None = 30.0, 

146 ) -> "RedisChannelMessage": 

147 cmd = RedisPublishCommand( 

148 message, 

149 channel=channel or self.channel.name, 

150 headers=self.headers | (headers or {}), 

151 correlation_id=correlation_id or gen_cor_id(), 

152 timeout=timeout, 

153 _publish_type=PublishType.REQUEST, 

154 message_format=self.config.message_format, 

155 ) 

156 

157 msg: RedisChannelMessage = await self._basic_request( 

158 cmd, 

159 producer=self.producer, 

160 ) 

161 return msg 

162 

163 

164class ListPublisher(LogicPublisher): 

165 def __init__( 

166 self, 

167 config: "RedisPublisherConfig", 

168 specification: "PublisherSpecification[Any, Any]", 

169 *, 

170 list: "ListSub", 

171 ) -> None: 

172 super().__init__(config, specification) 

173 

174 self._list = list 

175 

176 @property 

177 def list(self) -> "ListSub": 

178 return self._list.add_prefix(self._outer_config.prefix) 

179 

180 @override 

181 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]: 

182 return { 

183 "channel": None, 

184 "list": self.list.name if name_only else self.list, 

185 "stream": None, 

186 } 

187 

188 @override 

189 async def publish( 

190 self, 

191 message: "SendableMessage" = None, 

192 list: str | None = None, 

193 reply_to: str = "", 

194 headers: dict[str, Any] | None = None, 

195 correlation_id: str | None = None, 

196 *, 

197 pipeline: Optional["Pipeline[bytes]"] = None, 

198 ) -> int: 

199 cmd = RedisPublishCommand( 

200 message, 

201 list=list or self.list.name, 

202 reply_to=reply_to or self.reply_to, 

203 headers=self.headers | (headers or {}), 

204 correlation_id=correlation_id or gen_cor_id(), 

205 pipeline=pipeline, 

206 _publish_type=PublishType.PUBLISH, 

207 message_format=self.config.message_format, 

208 ) 

209 

210 result: int = await self._basic_publish( 

211 cmd, 

212 producer=self.producer, 

213 _extra_middlewares=(), 

214 ) 

215 return result 

216 

217 @override 

218 async def _publish( 

219 self, 

220 cmd: Union["PublishCommand", "RedisPublishCommand"], 

221 *, 

222 _extra_middlewares: Iterable["PublisherMiddleware"], 

223 ) -> None: 

224 """This method should be called in subscriber flow only.""" 

225 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format) 

226 

227 cmd.set_destination(list=self.list.name) 

228 

229 cmd.add_headers(self.headers, override=False) 

230 cmd.reply_to = cmd.reply_to or self.reply_to 

231 

232 await self._basic_publish( 

233 cmd, 

234 producer=self.producer, 

235 _extra_middlewares=_extra_middlewares, 

236 ) 

237 

238 @override 

239 async def request( 

240 self, 

241 message: "SendableMessage" = None, 

242 list: str | None = None, 

243 *, 

244 correlation_id: str | None = None, 

245 headers: dict[str, Any] | None = None, 

246 timeout: float | None = 30.0, 

247 ) -> "RedisChannelMessage": 

248 cmd = RedisPublishCommand( 

249 message, 

250 list=list or self.list.name, 

251 headers=self.headers | (headers or {}), 

252 correlation_id=correlation_id or gen_cor_id(), 

253 timeout=timeout, 

254 _publish_type=PublishType.REQUEST, 

255 message_format=self.config.message_format, 

256 ) 

257 

258 msg: RedisChannelMessage = await self._basic_request( 

259 cmd, 

260 producer=self.producer, 

261 ) 

262 return msg 

263 

264 

265class ListBatchPublisher(ListPublisher): 

266 @override 

267 async def publish( # type: ignore[override] 

268 self, 

269 *messages: "SendableMessage", 

270 list: str, 

271 correlation_id: str | None = None, 

272 reply_to: str = "", 

273 headers: dict[str, Any] | None = None, 

274 pipeline: Optional["Pipeline[bytes]"] = None, 

275 ) -> int: 

276 cmd = RedisPublishCommand( 

277 *messages, 

278 list=list or self.list.name, 

279 reply_to=reply_to or self.reply_to, 

280 headers=self.headers | (headers or {}), 

281 correlation_id=correlation_id or gen_cor_id(), 

282 pipeline=pipeline, 

283 _publish_type=PublishType.PUBLISH, 

284 message_format=self.config.message_format, 

285 ) 

286 

287 result: int = await self._basic_publish_batch( 

288 cmd, 

289 producer=self.producer, 

290 _extra_middlewares=(), 

291 ) 

292 return result 

293 

294 @override 

295 async def _publish( 

296 self, 

297 cmd: Union["PublishCommand", "RedisPublishCommand"], 

298 *, 

299 _extra_middlewares: Iterable["PublisherMiddleware"], 

300 ) -> None: 

301 """This method should be called in subscriber flow only.""" 

302 cmd = RedisPublishCommand.from_cmd( 

303 cmd, batch=True, message_format=self.config.message_format 

304 ) 

305 

306 cmd.set_destination(list=self.list.name) 

307 

308 cmd.add_headers(self.headers, override=False) 

309 cmd.reply_to = cmd.reply_to or self.reply_to 

310 

311 await self._basic_publish_batch( 

312 cmd, 

313 producer=self.producer, 

314 _extra_middlewares=_extra_middlewares, 

315 ) 

316 

317 

318class StreamPublisher(LogicPublisher): 

319 def __init__( 

320 self, 

321 config: "RedisPublisherConfig", 

322 specification: "PublisherSpecification[Any, Any]", 

323 *, 

324 stream: "StreamSub", 

325 ) -> None: 

326 super().__init__(config, specification) 

327 self._stream = stream 

328 

329 @property 

330 def stream(self) -> "StreamSub": 

331 return self._stream.add_prefix(self._outer_config.prefix) 

332 

333 @override 

334 def subscriber_property(self, *, name_only: bool) -> dict[str, Any]: 

335 return { 

336 "channel": None, 

337 "list": None, 

338 "stream": self.stream.name if name_only else self.stream, 

339 } 

340 

341 @override 

342 async def publish( 

343 self, 

344 message: "SendableMessage" = None, 

345 stream: str | None = None, 

346 reply_to: str = "", 

347 headers: dict[str, Any] | None = None, 

348 correlation_id: str | None = None, 

349 *, 

350 maxlen: int | None = None, 

351 pipeline: Optional["Pipeline[bytes]"] = None, 

352 ) -> bytes: 

353 cmd = RedisPublishCommand( 

354 message, 

355 stream=stream or self.stream.name, 

356 reply_to=reply_to or self.reply_to, 

357 headers=self.headers | (headers or {}), 

358 correlation_id=correlation_id or gen_cor_id(), 

359 maxlen=maxlen or self.stream.maxlen, 

360 pipeline=pipeline, 

361 _publish_type=PublishType.PUBLISH, 

362 message_format=self.config.message_format, 

363 ) 

364 

365 result: bytes = await self._basic_publish( 

366 cmd, 

367 producer=self.producer, 

368 _extra_middlewares=(), 

369 ) 

370 return result 

371 

372 @override 

373 async def _publish( 

374 self, 

375 cmd: Union["PublishCommand", "RedisPublishCommand"], 

376 *, 

377 _extra_middlewares: Iterable["PublisherMiddleware"], 

378 ) -> None: 

379 """This method should be called in subscriber flow only.""" 

380 cmd = RedisPublishCommand.from_cmd(cmd, message_format=self.config.message_format) 

381 

382 cmd.set_destination(stream=self.stream.name) 

383 

384 cmd.add_headers(self.headers, override=False) 

385 cmd.reply_to = cmd.reply_to or self.reply_to 

386 cmd.maxlen = self.stream.maxlen 

387 

388 await self._basic_publish( 

389 cmd, 

390 producer=self.producer, 

391 _extra_middlewares=_extra_middlewares, 

392 ) 

393 

394 @override 

395 async def request( 

396 self, 

397 message: "SendableMessage" = None, 

398 stream: str | None = None, 

399 *, 

400 maxlen: int | None = None, 

401 correlation_id: str | None = None, 

402 headers: dict[str, Any] | None = None, 

403 timeout: float | None = 30.0, 

404 ) -> "RedisChannelMessage": 

405 cmd = RedisPublishCommand( 

406 message, 

407 stream=stream or self.stream.name, 

408 headers=self.headers | (headers or {}), 

409 correlation_id=correlation_id or gen_cor_id(), 

410 maxlen=maxlen or self.stream.maxlen, 

411 timeout=timeout, 

412 _publish_type=PublishType.REQUEST, 

413 message_format=self.config.message_format, 

414 ) 

415 

416 msg: RedisChannelMessage = await self._basic_request( 

417 cmd, 

418 producer=self.producer, 

419 ) 

420 return msg