Coverage for faststream / redis / configs / broker.py: 93%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from dataclasses import dataclass 

2from typing import TYPE_CHECKING 

3 

4from faststream._internal.configs import BrokerConfig 

5from faststream.exceptions import IncorrectState 

6 

7if TYPE_CHECKING: 

8 from faststream.redis.parser import MessageFormat 

9 from faststream.redis.publisher.producer import RedisFastProducer 

10 

11 from .state import ConnectionState 

12 

13 

14@dataclass(kw_only=True) 

15class RedisBrokerConfig(BrokerConfig): 

16 producer: "RedisFastProducer" 

17 connection: "ConnectionState" 

18 

19 message_format: type["MessageFormat"] 

20 

21 async def connect(self) -> None: 

22 self.producer.connect(self.fd_config._serializer) 

23 await self.connection.connect() 

24 

25 async def disconnect(self) -> None: 

26 await self.connection.disconnect() 

27 

28 

29@dataclass(kw_only=True) 

30class RedisRouterConfig(BrokerConfig): 

31 @property 

32 def connection(self) -> ConnectionError: 

33 raise IncorrectState