Coverage for faststream / redis / configs / state.py: 86%
24 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from redis.asyncio.client import Redis
4from redis.asyncio.connection import ConnectionPool
6from faststream.__about__ import __version__
7from faststream.exceptions import IncorrectState
10class ConnectionState:
11 def __init__(self, options: dict[str, Any] | None = None) -> None:
12 self._options = options or {}
14 self._connected = False
15 self._client: Redis[bytes] | None = None
17 @property
18 def client(self) -> "Redis[bytes]":
19 if not self._client: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 msg = "Connection is not available yet. Please, connect the broker first."
21 raise IncorrectState(msg)
23 return self._client
25 def __bool__(self) -> bool:
26 return self._connected
28 async def connect(self) -> "Redis[bytes]":
29 pool = ConnectionPool(
30 **self._options,
31 lib_name="faststream",
32 lib_version=__version__,
33 )
34 client: Redis[bytes] = Redis.from_pool(pool) # type: ignore[attr-defined]
36 self._client = client
37 self._connected = True
39 return client
41 async def disconnect(self) -> None:
42 if self._client:
43 await self._client.aclose() # type: ignore[attr-defined]
45 self._client = None
46 self._connected = False