Coverage for faststream / redis / broker / broker.py: 92%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from collections.abc import Iterable, Mapping, Sequence 

3from typing import ( 

4 TYPE_CHECKING, 

5 Any, 

6 Optional, 

7 TypeAlias, 

8 Union, 

9) 

10from urllib.parse import urlparse 

11 

12import anyio 

13from anyio import move_on_after 

14from fast_depends import Provider, dependency_provider 

15from redis.asyncio.connection import ( 

16 Connection, 

17 DefaultParser, 

18 Encoder, 

19 parse_url, 

20) 

21from redis.exceptions import ConnectionError 

22from typing_extensions import overload, override 

23 

24from faststream._internal.broker import BrokerUsecase 

25from faststream._internal.constants import EMPTY 

26from faststream._internal.context.repository import ContextRepo 

27from faststream._internal.di import FastDependsConfig 

28from faststream.message import gen_cor_id 

29from faststream.middlewares import AckPolicy 

30from faststream.redis.configs import ConnectionState, RedisBrokerConfig 

31from faststream.redis.message import UnifyRedisDict 

32from faststream.redis.parser import BinaryMessageFormatV1, MessageFormat 

33from faststream.redis.publisher.producer import RedisFastProducer 

34from faststream.redis.response import RedisPublishCommand 

35from faststream.redis.security import parse_security 

36from faststream.response.publish_type import PublishType 

37from faststream.specification.schema import BrokerSpec 

38 

39from .logging import make_redis_logger_state 

40from .registrator import RedisRegistrator 

41 

42if TYPE_CHECKING: 

43 from types import TracebackType 

44 

45 from fast_depends.dependencies import Dependant 

46 from fast_depends.library.serializer import SerializerProto 

47 from redis.asyncio.client import Pipeline, Redis 

48 from redis.asyncio.connection import BaseParser 

49 from typing_extensions import TypedDict 

50 

51 from faststream._internal.basic_types import LoggerProto, SendableMessage 

52 from faststream._internal.parser import CodecProto 

53 from faststream._internal.types import BrokerMiddleware, CustomCallable 

54 from faststream.redis.message import RedisChannelMessage 

55 from faststream.security import BaseSecurity 

56 from faststream.specification.schema.extra import Tag, TagDict 

57 

58 class RedisInitKwargs(TypedDict, total=False): 

59 host: str | None 

60 port: str | int | None 

61 db: str | int | None 

62 client_name: str | None 

63 health_check_interval: float | None 

64 max_connections: int | None 

65 socket_timeout: float | None 

66 socket_connect_timeout: float | None 

67 socket_read_size: int | None 

68 socket_keepalive: bool | None 

69 socket_keepalive_options: Mapping[int, int | bytes] | None 

70 socket_type: int | None 

71 retry_on_timeout: bool | None 

72 encoding: str | None 

73 encoding_errors: str | None 

74 parser_class: type["BaseParser"] | None 

75 connection_class: type["Connection"] | None 

76 encoder_class: type["Encoder"] | None 

77 

78 

79Channel: TypeAlias = str 

80 

81 

82class RedisBroker( 

83 RedisRegistrator, 

84 BrokerUsecase[UnifyRedisDict, "Redis[bytes]"], 

85): 

86 """Redis broker.""" 

87 

88 def __init__( 

89 self, 

90 url: str = "redis://localhost:6379", 

91 *, 

92 host: str = EMPTY, 

93 port: str | int = EMPTY, 

94 db: str | int = EMPTY, 

95 connection_class: type["Connection"] = EMPTY, 

96 client_name: str | None = None, 

97 health_check_interval: float = 0, 

98 max_connections: int | None = None, 

99 socket_timeout: float | None = None, 

100 socket_connect_timeout: float | None = None, 

101 socket_read_size: int = 65536, 

102 socket_keepalive: bool = False, 

103 socket_keepalive_options: Mapping[int, int | bytes] | None = None, 

104 socket_type: int = 0, 

105 retry_on_timeout: bool = False, 

106 encoding: str = "utf-8", 

107 encoding_errors: str = "strict", 

108 parser_class: type["BaseParser"] = DefaultParser, 

109 encoder_class: type["Encoder"] = Encoder, 

110 graceful_timeout: float | None = 15.0, 

111 ack_policy: AckPolicy = EMPTY, 

112 decoder: Optional["CustomCallable"] = None, 

113 codec: Optional["CodecProto"] = None, 

114 parser: Optional["CustomCallable"] = None, 

115 dependencies: Iterable["Dependant"] = (), 

116 middlewares: Sequence["BrokerMiddleware[Any, Any]"] = (), 

117 routers: Iterable[RedisRegistrator] = (), 

118 message_format: type["MessageFormat"] = BinaryMessageFormatV1, 

119 security: Optional["BaseSecurity"] = None, 

120 specification_url: str | None = None, 

121 protocol: str | None = None, 

122 protocol_version: str | None = "custom", 

123 description: str | None = None, 

124 tags: Iterable[Union["Tag", "TagDict"]] = (), 

125 logger: Optional["LoggerProto"] = EMPTY, 

126 log_level: int = logging.INFO, 

127 apply_types: bool = True, 

128 serializer: Optional["SerializerProto"] = EMPTY, 

129 provider: Optional["Provider"] = None, 

130 context: Optional["ContextRepo"] = None, 

131 ) -> None: 

132 """Initialized the RedisBroker. 

133 

134 Args: 

135 url: 

136 The Redis connection URL. Defaults to "redis://localhost:6379". 

137 host: 

138 The Redis host to connect to. If not provided, it will be extracted from the URL. 

139 port: 

140 The Redis port to connect to. If not provided, it will be extracted from the URL. 

141 db: 

142 The Redis database to use. If not provided, it will be extracted from the URL. 

143 connection_class: 

144 The class to use for establishing connections. Defaults to EMPTY. 

145 client_name: 

146 The name of the Redis client. Defaults to None. 

147 health_check_interval: 

148 The interval at which to perform health checks on the broker. Defaults to 0. 

149 max_connections: 

150 The maximum number of connections to establish. Defaults to None. 

151 socket_timeout: 

152 The timeout for socket operations. Defaults to None. 

153 socket_connect_timeout: 

154 The timeout for connecting sockets. Defaults to None. 

155 socket_read_size: 

156 The size of the buffer used for reading from sockets. Defaults to 65536. 

157 socket_keepalive: 

158 Whether to enable keep-alive on sockets. Defaults to False. 

159 socket_keepalive_options: 

160 Options for keep-alive on sockets. Defaults to None. 

161 socket_type: 

162 The type of socket to use (if supported by your platform). Defaults to 0. 

163 retry_on_timeout: 

164 Whether to retry operations that timeout. Defaults to False. 

165 encoding: 

166 The encoding used for sending and receiving data. Defaults to "utf-8". 

167 encoding_errors: 

168 How to handle encoding errors. Defaults to "strict". 

169 parser_class: 

170 The class to use for parsing messages. Defaults to DefaultParser. 

171 encoder_class: 

172 The class to use for encoding messages. Defaults to Encoder. 

173 graceful_timeout: 

174 Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. Defaults to 15.0. 

175 ack_policy: 

176 Default acknowledgement policy for all subscribers. Individual subscribers can override. 

177 decoder: 

178 Custom decoder object. Defaults to None. 

179 codec: 

180 Custom codec object. Defaults to None. 

181 parser: 

182 Custom parser object. Defaults to None. 

183 dependencies: 

184 Dependencies to apply to all broker subscribers. Defaults to (). 

185 middlewares: 

186 Middlewares to apply to all broker publishers/subscribers. Defaults to (). 

187 routers: 

188 Routers to apply to broker. Defaults to (). 

189 message_format: 

190 What format to use when parsing messages. Defaults to BinaryMessageFormatV1. 

191 security: 

192 Security options to connect broker and generate AsyncAPI server security information. Defaults to None. 

193 specification_url: 

194 AsyncAPI hardcoded server addresses. Use `servers` if not specified. Defaults to None. 

195 protocol: 

196 AsyncAPI server protocol. Defaults to None. 

197 protocol_version: 

198 AsyncAPI server protocol version. Defaults to "custom". 

199 description: 

200 AsyncAPI server description. Defaults to None. 

201 tags: 

202 AsyncAPI server tags. Defaults to (). 

203 logger: 

204 User specified logger to pass into Context and log service messages. Defaults to EMPTY. 

205 log_level: 

206 Service messages log level. Defaults to logging.INFO. 

207 apply_types: 

208 Whether to use FastDepends or not. Defaults to True. 

209 serializer: 

210 Serializer object. Defaults to EMPTY. 

211 provider: 

212 Provider for FastDepends library. Defaults to None. 

213 context: 

214 Context repository for FastDepends library. Defaults to None. 

215 """ 

216 self.message_format = message_format 

217 

218 if specification_url is None: 

219 specification_url = url 

220 

221 if protocol is None: 

222 url_kwargs = urlparse(specification_url) 

223 protocol = url_kwargs.scheme 

224 

225 connection_options = _resolve_url_options( 

226 url, 

227 security=security, 

228 host=host, 

229 port=port, 

230 db=db, 

231 client_name=client_name, 

232 health_check_interval=health_check_interval, 

233 max_connections=max_connections, 

234 socket_timeout=socket_timeout, 

235 socket_connect_timeout=socket_connect_timeout, 

236 socket_read_size=socket_read_size, 

237 socket_keepalive=socket_keepalive, 

238 socket_keepalive_options=socket_keepalive_options, 

239 socket_type=socket_type, 

240 retry_on_timeout=retry_on_timeout, 

241 encoding=encoding, 

242 encoding_errors=encoding_errors, 

243 parser_class=parser_class, 

244 connection_class=connection_class, 

245 encoder_class=encoder_class, 

246 ) 

247 

248 connection_state = ConnectionState(connection_options) 

249 

250 super().__init__( 

251 **connection_options, 

252 routers=routers, 

253 config=RedisBrokerConfig( 

254 connection=connection_state, 

255 producer=RedisFastProducer( 

256 connection=connection_state, 

257 parser=parser, 

258 decoder=decoder, 

259 message_format=self.message_format, 

260 serializer=serializer, 

261 ), 

262 message_format=self.message_format, 

263 # both args 

264 broker_middlewares=middlewares, 

265 broker_parser=parser, 

266 broker_decoder=decoder, 

267 broker_codec=codec, 

268 logger=make_redis_logger_state( 

269 logger=logger, 

270 log_level=log_level, 

271 ), 

272 fd_config=FastDependsConfig( 

273 use_fastdepends=apply_types, 

274 serializer=serializer, 

275 provider=provider or dependency_provider, 

276 context=context or ContextRepo(), 

277 ), 

278 # subscriber args 

279 broker_dependencies=dependencies, 

280 graceful_timeout=graceful_timeout, 

281 ack_policy=ack_policy, 

282 extra_context={ 

283 "broker": self, 

284 }, 

285 ), 

286 specification=BrokerSpec( 

287 description=description, 

288 url=[specification_url], 

289 protocol=protocol, 

290 protocol_version=protocol_version, 

291 security=security, 

292 tags=tags, 

293 ), 

294 ) 

295 

296 @override 

297 async def _connect(self) -> "Redis[bytes]": 

298 await self.config.connect() 

299 return self.config.broker_config.connection.client 

300 

301 async def stop( 

302 self, 

303 exc_type: type[BaseException] | None = None, 

304 exc_val: BaseException | None = None, 

305 exc_tb: Optional["TracebackType"] = None, 

306 ) -> None: 

307 await super().stop(exc_type, exc_val, exc_tb) 

308 await self.config.disconnect() 

309 self._connection = None 

310 

311 async def start(self) -> None: 

312 await self.connect() 

313 await super().start() 

314 

315 @overload 

316 async def publish( 

317 self, 

318 message: "SendableMessage" = None, 

319 channel: str | None = None, 

320 *, 

321 reply_to: str = "", 

322 headers: dict[str, Any] | None = None, 

323 correlation_id: str | None = None, 

324 list: str | None = None, 

325 stream: None = None, 

326 maxlen: int | None = None, 

327 pipeline: Optional["Pipeline[bytes]"] = None, 

328 ) -> int: ... 

329 

330 @overload 

331 async def publish( 

332 self, 

333 message: "SendableMessage" = None, 

334 channel: str | None = None, 

335 *, 

336 reply_to: str = "", 

337 headers: dict[str, Any] | None = None, 

338 correlation_id: str | None = None, 

339 list: str | None = None, 

340 stream: str = ..., 

341 maxlen: int | None = None, 

342 pipeline: Optional["Pipeline[bytes]"] = None, 

343 ) -> bytes: ... 

344 

345 @override 

346 async def publish( 

347 self, 

348 message: "SendableMessage" = None, 

349 channel: str | None = None, 

350 *, 

351 reply_to: str = "", 

352 headers: dict[str, Any] | None = None, 

353 correlation_id: str | None = None, 

354 list: str | None = None, 

355 stream: str | None = None, 

356 maxlen: int | None = None, 

357 pipeline: Optional["Pipeline[bytes]"] = None, 

358 ) -> int | bytes: 

359 """Publish message directly. 

360 

361 This method allows you to publish a message in a non-AsyncAPI-documented way. 

362 It can be used in other frameworks or to publish messages at specific intervals. 

363 

364 Args: 

365 message: 

366 Message body to send. 

367 channel: 

368 Redis PubSub object name to send message. 

369 reply_to: 

370 Reply message destination PubSub object name. 

371 headers: 

372 Message headers to store metainformation. 

373 correlation_id: 

374 Manual message correlation_id setter. correlation_id is a useful option to trace messages. 

375 list: 

376 Redis List object name to send message. 

377 stream: 

378 Redis Stream object name to send message. 

379 maxlen: 

380 Redis Stream maxlen publish option. Remove eldest message if maxlen exceeded. 

381 pipeline: 

382 Redis pipeline to use for publishing messages. 

383 

384 Returns: 

385 int: The result of the publish operation, typically the number of messages published. 

386 """ 

387 cmd = RedisPublishCommand( 

388 message, 

389 correlation_id=correlation_id or gen_cor_id(), 

390 channel=channel, 

391 list=list, 

392 stream=stream, 

393 maxlen=maxlen, 

394 reply_to=reply_to, 

395 headers=headers, 

396 pipeline=pipeline, 

397 _publish_type=PublishType.PUBLISH, 

398 message_format=self.message_format, 

399 ) 

400 

401 result: int | bytes = await super()._basic_publish( 

402 cmd, 

403 producer=self.config.producer, 

404 ) 

405 return result 

406 

407 @override 

408 async def request( # type: ignore[override] 

409 self, 

410 message: "SendableMessage", 

411 channel: str | None = None, 

412 *, 

413 list: str | None = None, 

414 stream: str | None = None, 

415 maxlen: int | None = None, 

416 correlation_id: str | None = None, 

417 headers: dict[str, Any] | None = None, 

418 timeout: float | None = 30.0, 

419 ) -> "RedisChannelMessage": 

420 cmd = RedisPublishCommand( 

421 message, 

422 correlation_id=correlation_id or gen_cor_id(), 

423 channel=channel, 

424 list=list, 

425 stream=stream, 

426 maxlen=maxlen, 

427 headers=headers, 

428 timeout=timeout, 

429 _publish_type=PublishType.REQUEST, 

430 message_format=self.message_format, 

431 ) 

432 msg: RedisChannelMessage = await super()._basic_request( 

433 cmd, 

434 producer=self.config.producer, 

435 ) 

436 return msg 

437 

438 @override 

439 async def publish_batch( # type: ignore[override] 

440 self, 

441 *messages: "SendableMessage", 

442 list: str, 

443 correlation_id: str | None = None, 

444 reply_to: str = "", 

445 headers: dict[str, Any] | None = None, 

446 pipeline: Optional["Pipeline[bytes]"] = None, 

447 ) -> int: 

448 """Publish multiple messages to Redis List by one request. 

449 

450 Args: 

451 *messages: Messages bodies to send. 

452 list: Redis List object name to send messages. 

453 correlation_id: Manual message **correlation_id** setter. **correlation_id** is a useful option to trace messages. 

454 reply_to: Reply message destination PubSub object name. 

455 headers: Message headers to store metainformation. 

456 pipeline: Redis pipeline to use for publishing messages. 

457 

458 Returns: 

459 int: The result of the batch publish operation. 

460 """ 

461 cmd = RedisPublishCommand( 

462 *messages, 

463 list=list, 

464 reply_to=reply_to, 

465 headers=headers, 

466 correlation_id=correlation_id or gen_cor_id(), 

467 pipeline=pipeline, 

468 _publish_type=PublishType.PUBLISH, 

469 message_format=self.message_format, 

470 ) 

471 

472 result: int = await self._basic_publish_batch( 

473 cmd, 

474 producer=self.config.producer, 

475 ) 

476 return result 

477 

478 @override 

479 async def ping(self, timeout: float | None = 3) -> bool: 

480 sleep_time = (timeout or 10) / 10 

481 

482 with move_on_after(timeout) as cancel_scope: 

483 if self._connection is None: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 return False 

485 

486 while True: 

487 if cancel_scope.cancel_called: 487 ↛ 488line 487 didn't jump to line 488 because the condition on line 487 was never true

488 return False 

489 

490 try: 

491 if await self._connection.ping(): 491 ↛ 497line 491 didn't jump to line 497 because the condition on line 491 was always true

492 return True 

493 

494 except ConnectionError: 

495 pass 

496 

497 await anyio.sleep(sleep_time) 

498 

499 return False 

500 

501 

502def _resolve_url_options( 

503 url: str, 

504 *, 

505 security: Optional["BaseSecurity"], 

506 **kwargs: Any, 

507) -> dict[str, Any]: 

508 return { 

509 **dict(parse_url(url)), 

510 **parse_security(security), 

511 **{k: v for k, v in kwargs.items() if v is not EMPTY}, 

512 }