Coverage for faststream / nats / helpers / bucket_declarer.py: 90%
16 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Optional
3from nats.js.api import KeyValueConfig
5from .state import ConnectedState, ConnectionState, EmptyConnectionState
7if TYPE_CHECKING:
8 from nats.js import JetStreamContext
9 from nats.js.api import Placement, RePublish, StorageType
10 from nats.js.kv import KeyValue
13class KVBucketDeclarer:
14 buckets: dict[str, "KeyValue"]
16 def __init__(self) -> None:
17 self.buckets = {}
19 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()
21 def connect(self, connection: "JetStreamContext") -> None:
22 self.__state = ConnectedState(connection)
24 def disconnect(self) -> None:
25 self.__state = EmptyConnectionState()
27 async def create_key_value(
28 self,
29 bucket: str,
30 *,
31 description: str | None = None,
32 max_value_size: int | None = None,
33 history: int = 1,
34 ttl: float | None = None, # in seconds
35 max_bytes: int | None = None,
36 storage: Optional["StorageType"] = None,
37 replicas: int = 1,
38 placement: Optional["Placement"] = None,
39 republish: Optional["RePublish"] = None,
40 direct: bool | None = None,
41 # custom
42 declare: bool = True,
43 ) -> "KeyValue":
44 if (key_value := self.buckets.get(bucket)) is None:
45 if declare: 45 ↛ 62line 45 didn't jump to line 62 because the condition on line 45 was always true
46 key_value = await self.__state.connection.create_key_value(
47 config=KeyValueConfig(
48 bucket=bucket,
49 description=description,
50 max_value_size=max_value_size,
51 history=history,
52 ttl=ttl,
53 max_bytes=max_bytes,
54 storage=storage,
55 replicas=replicas,
56 placement=placement,
57 republish=republish,
58 direct=direct,
59 ),
60 )
61 else:
62 key_value = await self.__state.connection.key_value(bucket)
64 self.buckets[bucket] = key_value
66 return key_value