Coverage for faststream / nats / helpers / obj_storage_declarer.py: 90%

16 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Optional 

2 

3from nats.js.api import ObjectStoreConfig 

4 

5from .state import ConnectedState, ConnectionState, EmptyConnectionState 

6 

7if TYPE_CHECKING: 

8 from nats.js import JetStreamContext 

9 from nats.js.api import Placement, StorageType 

10 from nats.js.object_store import ObjectStore 

11 

12 

13class OSBucketDeclarer: 

14 buckets: dict[str, "ObjectStore"] 

15 

16 def __init__(self) -> None: 

17 self.buckets = {} 

18 

19 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState() 

20 

21 def connect(self, connection: "JetStreamContext") -> None: 

22 self.__state = ConnectedState(connection) 

23 

24 def disconnect(self) -> None: 

25 self.__state = EmptyConnectionState() 

26 

27 async def create_object_store( 

28 self, 

29 bucket: str, 

30 *, 

31 description: str | None = None, 

32 ttl: float | None = None, 

33 max_bytes: int | None = None, 

34 storage: Optional["StorageType"] = None, 

35 replicas: int = 1, 

36 placement: Optional["Placement"] = None, 

37 # custom 

38 declare: bool = True, 

39 ) -> "ObjectStore": 

40 if (object_store := self.buckets.get(bucket)) is None: 

41 if declare: 41 ↛ 55line 41 didn't jump to line 55 because the condition on line 41 was always true

42 object_store = await self.__state.connection.create_object_store( 

43 bucket=bucket, 

44 config=ObjectStoreConfig( 

45 bucket=bucket, 

46 description=description, 

47 ttl=ttl, 

48 max_bytes=max_bytes, 

49 storage=storage, 

50 replicas=replicas, 

51 placement=placement, 

52 ), 

53 ) 

54 else: 

55 object_store = await self.__state.connection.object_store(bucket) 

56 

57 self.buckets[bucket] = object_store 

58 

59 return object_store