Coverage for faststream / nats / helpers / bucket_declarer.py: 90%

16 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Optional 

2 

3from nats.js.api import KeyValueConfig 

4 

5from .state import ConnectedState, ConnectionState, EmptyConnectionState 

6 

7if TYPE_CHECKING: 

8 from nats.js import JetStreamContext 

9 from nats.js.api import Placement, RePublish, StorageType 

10 from nats.js.kv import KeyValue 

11 

12 

13class KVBucketDeclarer: 

14 buckets: dict[str, "KeyValue"] 

15 

16 def __init__(self) -> None: 

17 self.buckets = {} 

18 

19 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState() 

20 

21 def connect(self, connection: "JetStreamContext") -> None: 

22 self.__state = ConnectedState(connection) 

23 

24 def disconnect(self) -> None: 

25 self.__state = EmptyConnectionState() 

26 

27 async def create_key_value( 

28 self, 

29 bucket: str, 

30 *, 

31 description: str | None = None, 

32 max_value_size: int | None = None, 

33 history: int = 1, 

34 ttl: float | None = None, # in seconds 

35 max_bytes: int | None = None, 

36 storage: Optional["StorageType"] = None, 

37 replicas: int = 1, 

38 placement: Optional["Placement"] = None, 

39 republish: Optional["RePublish"] = None, 

40 direct: bool | None = None, 

41 # custom 

42 declare: bool = True, 

43 ) -> "KeyValue": 

44 if (key_value := self.buckets.get(bucket)) is None: 

45 if declare: 45 ↛ 62line 45 didn't jump to line 62 because the condition on line 45 was always true

46 key_value = await self.__state.connection.create_key_value( 

47 config=KeyValueConfig( 

48 bucket=bucket, 

49 description=description, 

50 max_value_size=max_value_size, 

51 history=history, 

52 ttl=ttl, 

53 max_bytes=max_bytes, 

54 storage=storage, 

55 replicas=replicas, 

56 placement=placement, 

57 republish=republish, 

58 direct=direct, 

59 ), 

60 ) 

61 else: 

62 key_value = await self.__state.connection.key_value(bucket) 

63 

64 self.buckets[bucket] = key_value 

65 

66 return key_value