Coverage for faststream / nats / schemas / pull_sub.py: 88%
13 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Literal, Optional, Union, overload
4class PullSub:
5 """A class to represent a NATS pull subscription.
7 Args:
8 batch_size (int): Consuming messages batch size. (default is `1`).
9 timeout (:obj:`float`, optional): Wait this time for required batch size will be accumulated in stream
10 in seconds (default is `5.0`).
11 batch (bool): Whether to propagate consuming batch as iterable object to your handler (default is `False`).
12 """
14 __slots__ = (
15 "batch",
16 "batch_size",
17 "timeout",
18 )
20 def __init__(
21 self,
22 batch_size: int = 1,
23 timeout: float | None = 5.0,
24 batch: bool = False,
25 ) -> None:
26 self.batch_size = batch_size
27 self.batch = batch
28 self.timeout = timeout
30 @overload
31 @classmethod
32 def validate(cls, value: Literal[True]) -> "PullSub": ...
34 @overload
35 @classmethod
36 def validate(cls, value: Literal[False]) -> None: ...
38 @overload
39 @classmethod
40 def validate(cls, value: "PullSub") -> "PullSub": ...
42 @overload
43 @classmethod
44 def validate(cls, value: Union[bool, "PullSub"]) -> Optional["PullSub"]: ...
46 @classmethod
47 def validate(cls, value: Union[bool, "PullSub"]) -> Optional["PullSub"]:
48 if value is True: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true
49 return PullSub()
50 if value is False:
51 return None
52 return value