Coverage for faststream / nats / subscriber / state.py: 0%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Protocol 

2 

3from faststream.exceptions import IncorrectState 

4 

5if TYPE_CHECKING: 

6 from nats.aio.client import Client 

7 from nats.js import JetStreamContext 

8 

9 from faststream.nats.broker.state import BrokerState 

10 from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer 

11 

12 

13class SubscriberState(Protocol): 

14 @property 

15 def client(self) -> "Client": ... 

16 

17 @property 

18 def js(self) -> "JetStreamContext": ... 

19 

20 kv_declarer: "KVBucketDeclarer" 

21 os_declarer: "OSBucketDeclarer" 

22 

23 

24class EmptySubscriberState(SubscriberState): 

25 @property 

26 def client(self) -> "Client": 

27 msg = "Connection is not available yet. Please, setup the subscriber first." 

28 raise IncorrectState(msg) 

29 

30 @property 

31 def js(self) -> "JetStreamContext": 

32 msg = "Stream is not available yet. Please, setup the subscriber first." 

33 raise IncorrectState(msg) 

34 

35 @property 

36 def kv_declarer(self) -> "KVBucketDeclarer": 

37 msg = "KeyValue is not available yet. Please, setup the subscriber first." 

38 raise IncorrectState(msg) 

39 

40 @kv_declarer.setter 

41 def kv_declarer(self, v: "KVBucketDeclarer") -> None: 

42 msg = "Connection is not available yet. Please, setup the subscriber first." 

43 raise IncorrectState(msg) 

44 

45 @property 

46 def os_declarer(self) -> "OSBucketDeclarer": 

47 msg = "ObjectStorage is not available yet. Please, setup the subscriber first." 

48 raise IncorrectState(msg) 

49 

50 @os_declarer.setter 

51 def os_declarer(self, v: "OSBucketDeclarer") -> None: 

52 msg = "Connection is not available yet. Please, setup the subscriber first." 

53 raise IncorrectState(msg) 

54 

55 

56class ConnectedSubscriberState(SubscriberState): 

57 def __init__( 

58 self, 

59 *, 

60 parent_state: "BrokerState", 

61 kv_declarer: "KVBucketDeclarer", 

62 os_declarer: "OSBucketDeclarer", 

63 ) -> None: 

64 self._parent_state = parent_state 

65 self.kv_declarer = kv_declarer 

66 self.os_declarer = os_declarer 

67 

68 @property 

69 def client(self) -> "Client": 

70 return self._parent_state.connection 

71 

72 @property 

73 def js(self) -> "JetStreamContext": 

74 return self._parent_state.stream