Coverage for faststream / redis / configs / broker.py: 93%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from dataclasses import dataclass
2from typing import TYPE_CHECKING
4from faststream._internal.configs import BrokerConfig
5from faststream.exceptions import IncorrectState
7if TYPE_CHECKING:
8 from faststream.redis.parser import MessageFormat
9 from faststream.redis.publisher.producer import RedisFastProducer
11 from .state import ConnectionState
14@dataclass(kw_only=True)
15class RedisBrokerConfig(BrokerConfig):
16 producer: "RedisFastProducer"
17 connection: "ConnectionState"
19 message_format: type["MessageFormat"]
21 async def connect(self) -> None:
22 self.producer.connect(self.fd_config._serializer)
23 await self.connection.connect()
25 async def disconnect(self) -> None:
26 await self.connection.disconnect()
29@dataclass(kw_only=True)
30class RedisRouterConfig(BrokerConfig):
31 @property
32 def connection(self) -> ConnectionError:
33 raise IncorrectState