Coverage for faststream / nats / schemas / obj_watch.py: 89%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Literal, Optional, Union, overload
4class ObjWatch:
5 """A class to represent a NATS object storage watch subscription.
7 Args:
8 ignore_deletes (bool): Ignore delete events (default is `False`).
9 include_history (bool): Include history (default is `False`).
10 meta_only (bool): Only metadata. (default is `False`).
11 timeout (float): The timeout for the watch in seconds (default is `5.0`).
12 declare (bool): Whether to create object storage automatically or just connect to it (default is `True`).
13 """
15 __slots__ = (
16 "declare",
17 "ignore_deletes",
18 "include_history",
19 "meta_only",
20 "timeout",
21 )
23 def __init__(
24 self,
25 ignore_deletes: bool = False,
26 include_history: bool = False,
27 meta_only: bool = False,
28 timeout: float = 5.0,
29 # custom
30 declare: bool = True,
31 ) -> None:
32 self.ignore_deletes = ignore_deletes
33 self.include_history = include_history
34 self.meta_only = meta_only
35 self.timeout = timeout
37 self.declare = declare
39 @overload
40 @classmethod
41 def validate(cls, value: Literal[True]) -> "ObjWatch": ...
43 @overload
44 @classmethod
45 def validate(cls, value: Literal[False]) -> None: ...
47 @overload
48 @classmethod
49 def validate(cls, value: "ObjWatch") -> "ObjWatch": ...
51 @overload
52 @classmethod
53 def validate(cls, value: Union[bool, "ObjWatch"]) -> Optional["ObjWatch"]: ...
55 @classmethod
56 def validate(cls, value: Union[bool, "ObjWatch"]) -> Optional["ObjWatch"]:
57 if value is True:
58 return ObjWatch()
59 if value is False: 59 ↛ 61line 59 didn't jump to line 61 because the condition on line 59 was always true
60 return None
61 return value