Coverage for faststream / nats / schemas / obj_watch.py: 89%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Literal, Optional, Union, overload 

2 

3 

4class ObjWatch: 

5 """A class to represent a NATS object storage watch subscription. 

6 

7 Args: 

8 ignore_deletes (bool): Ignore delete events (default is `False`). 

9 include_history (bool): Include history (default is `False`). 

10 meta_only (bool): Only metadata. (default is `False`). 

11 timeout (float): The timeout for the watch in seconds (default is `5.0`). 

12 declare (bool): Whether to create object storage automatically or just connect to it (default is `True`). 

13 """ 

14 

15 __slots__ = ( 

16 "declare", 

17 "ignore_deletes", 

18 "include_history", 

19 "meta_only", 

20 "timeout", 

21 ) 

22 

23 def __init__( 

24 self, 

25 ignore_deletes: bool = False, 

26 include_history: bool = False, 

27 meta_only: bool = False, 

28 timeout: float = 5.0, 

29 # custom 

30 declare: bool = True, 

31 ) -> None: 

32 self.ignore_deletes = ignore_deletes 

33 self.include_history = include_history 

34 self.meta_only = meta_only 

35 self.timeout = timeout 

36 

37 self.declare = declare 

38 

39 @overload 

40 @classmethod 

41 def validate(cls, value: Literal[True]) -> "ObjWatch": ... 

42 

43 @overload 

44 @classmethod 

45 def validate(cls, value: Literal[False]) -> None: ... 

46 

47 @overload 

48 @classmethod 

49 def validate(cls, value: "ObjWatch") -> "ObjWatch": ... 

50 

51 @overload 

52 @classmethod 

53 def validate(cls, value: Union[bool, "ObjWatch"]) -> Optional["ObjWatch"]: ... 

54 

55 @classmethod 

56 def validate(cls, value: Union[bool, "ObjWatch"]) -> Optional["ObjWatch"]: 

57 if value is True: 

58 return ObjWatch() 

59 if value is False: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true

60 return None 

61 return value