Coverage for faststream / nats / broker / state.py: 81%

27 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING 

2 

3from faststream.exceptions import IncorrectState 

4 

5if TYPE_CHECKING: 

6 from nats.aio.client import Client 

7 from nats.js import JetStreamContext 

8 

9 

10class BrokerState: 

11 def __init__(self) -> None: 

12 self._connected = False 

13 

14 self._stream: JetStreamContext | None = None 

15 self._connection: Client | None = None 

16 

17 @property 

18 def connection(self) -> "Client": 

19 if not self._connection: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true

20 msg = "Connection is not available yet. Please, connect the broker first." 

21 raise IncorrectState(msg) 

22 return self._connection 

23 

24 @property 

25 def stream(self) -> "JetStreamContext": 

26 if not self._stream: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 msg = "Stream is not available yet. Please, connect the broker first." 

28 raise IncorrectState(msg) 

29 return self._stream 

30 

31 def __bool__(self) -> bool: 

32 return self._connected 

33 

34 def connect(self, connection: "Client", stream: "JetStreamContext") -> None: 

35 self._connection = connection 

36 self._stream = stream 

37 self._connected = True 

38 

39 def disconnect(self) -> None: 

40 self._connection = None 

41 self._stream = None 

42 self._connected = False