Coverage for faststream / nats / broker / state.py: 81%
27 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING
3from faststream.exceptions import IncorrectState
5if TYPE_CHECKING:
6 from nats.aio.client import Client
7 from nats.js import JetStreamContext
10class BrokerState:
11 def __init__(self) -> None:
12 self._connected = False
14 self._stream: JetStreamContext | None = None
15 self._connection: Client | None = None
17 @property
18 def connection(self) -> "Client":
19 if not self._connection: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true
20 msg = "Connection is not available yet. Please, connect the broker first."
21 raise IncorrectState(msg)
22 return self._connection
24 @property
25 def stream(self) -> "JetStreamContext":
26 if not self._stream: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true
27 msg = "Stream is not available yet. Please, connect the broker first."
28 raise IncorrectState(msg)
29 return self._stream
31 def __bool__(self) -> bool:
32 return self._connected
34 def connect(self, connection: "Client", stream: "JetStreamContext") -> None:
35 self._connection = connection
36 self._stream = stream
37 self._connected = True
39 def disconnect(self) -> None:
40 self._connection = None
41 self._stream = None
42 self._connected = False