Coverage for faststream / nats / helpers / obj_storage_declarer.py: 90%
16 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Optional
3from nats.js.api import ObjectStoreConfig
5from .state import ConnectedState, ConnectionState, EmptyConnectionState
7if TYPE_CHECKING:
8 from nats.js import JetStreamContext
9 from nats.js.api import Placement, StorageType
10 from nats.js.object_store import ObjectStore
13class OSBucketDeclarer:
14 buckets: dict[str, "ObjectStore"]
16 def __init__(self) -> None:
17 self.buckets = {}
19 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()
21 def connect(self, connection: "JetStreamContext") -> None:
22 self.__state = ConnectedState(connection)
24 def disconnect(self) -> None:
25 self.__state = EmptyConnectionState()
27 async def create_object_store(
28 self,
29 bucket: str,
30 *,
31 description: str | None = None,
32 ttl: float | None = None,
33 max_bytes: int | None = None,
34 storage: Optional["StorageType"] = None,
35 replicas: int = 1,
36 placement: Optional["Placement"] = None,
37 # custom
38 declare: bool = True,
39 ) -> "ObjectStore":
40 if (object_store := self.buckets.get(bucket)) is None:
41 if declare: 41 ↛ 55line 41 didn't jump to line 55 because the condition on line 41 was always true
42 object_store = await self.__state.connection.create_object_store(
43 bucket=bucket,
44 config=ObjectStoreConfig(
45 bucket=bucket,
46 description=description,
47 ttl=ttl,
48 max_bytes=max_bytes,
49 storage=storage,
50 replicas=replicas,
51 placement=placement,
52 ),
53 )
54 else:
55 object_store = await self.__state.connection.object_store(bucket)
57 self.buckets[bucket] = object_store
59 return object_store