Coverage for faststream / nats / subscriber / state.py: 0%
39 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Protocol
3from faststream.exceptions import IncorrectState
5if TYPE_CHECKING:
6 from nats.aio.client import Client
7 from nats.js import JetStreamContext
9 from faststream.nats.broker.state import BrokerState
10 from faststream.nats.helpers import KVBucketDeclarer, OSBucketDeclarer
13class SubscriberState(Protocol):
14 @property
15 def client(self) -> "Client": ...
17 @property
18 def js(self) -> "JetStreamContext": ...
20 kv_declarer: "KVBucketDeclarer"
21 os_declarer: "OSBucketDeclarer"
24class EmptySubscriberState(SubscriberState):
25 @property
26 def client(self) -> "Client":
27 msg = "Connection is not available yet. Please, setup the subscriber first."
28 raise IncorrectState(msg)
30 @property
31 def js(self) -> "JetStreamContext":
32 msg = "Stream is not available yet. Please, setup the subscriber first."
33 raise IncorrectState(msg)
35 @property
36 def kv_declarer(self) -> "KVBucketDeclarer":
37 msg = "KeyValue is not available yet. Please, setup the subscriber first."
38 raise IncorrectState(msg)
40 @kv_declarer.setter
41 def kv_declarer(self, v: "KVBucketDeclarer") -> None:
42 msg = "Connection is not available yet. Please, setup the subscriber first."
43 raise IncorrectState(msg)
45 @property
46 def os_declarer(self) -> "OSBucketDeclarer":
47 msg = "ObjectStorage is not available yet. Please, setup the subscriber first."
48 raise IncorrectState(msg)
50 @os_declarer.setter
51 def os_declarer(self, v: "OSBucketDeclarer") -> None:
52 msg = "Connection is not available yet. Please, setup the subscriber first."
53 raise IncorrectState(msg)
56class ConnectedSubscriberState(SubscriberState):
57 def __init__(
58 self,
59 *,
60 parent_state: "BrokerState",
61 kv_declarer: "KVBucketDeclarer",
62 os_declarer: "OSBucketDeclarer",
63 ) -> None:
64 self._parent_state = parent_state
65 self.kv_declarer = kv_declarer
66 self.os_declarer = os_declarer
68 @property
69 def client(self) -> "Client":
70 return self._parent_state.connection
72 @property
73 def js(self) -> "JetStreamContext":
74 return self._parent_state.stream