Coverage for faststream / redis / configs / state.py: 86%

24 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from redis.asyncio.client import Redis 

4from redis.asyncio.connection import ConnectionPool 

5 

6from faststream.__about__ import __version__ 

7from faststream.exceptions import IncorrectState 

8 

9 

10class ConnectionState: 

11 def __init__(self, options: dict[str, Any] | None = None) -> None: 

12 self._options = options or {} 

13 

14 self._connected = False 

15 self._client: Redis[bytes] | None = None 

16 

17 @property 

18 def client(self) -> "Redis[bytes]": 

19 if not self._client: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 msg = "Connection is not available yet. Please, connect the broker first." 

21 raise IncorrectState(msg) 

22 

23 return self._client 

24 

25 def __bool__(self) -> bool: 

26 return self._connected 

27 

28 async def connect(self) -> "Redis[bytes]": 

29 pool = ConnectionPool( 

30 **self._options, 

31 lib_name="faststream", 

32 lib_version=__version__, 

33 ) 

34 client: Redis[bytes] = Redis.from_pool(pool) # type: ignore[attr-defined] 

35 

36 self._client = client 

37 self._connected = True 

38 

39 return client 

40 

41 async def disconnect(self) -> None: 

42 if self._client: 

43 await self._client.aclose() # type: ignore[attr-defined] 

44 

45 self._client = None 

46 self._connected = False