Coverage for faststream / nats / schemas / pull_sub.py: 88%

13 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Literal, Optional, Union, overload 

2 

3 

4class PullSub: 

5 """A class to represent a NATS pull subscription. 

6 

7 Args: 

8 batch_size (int): Consuming messages batch size. (default is `1`). 

9 timeout (:obj:`float`, optional): Wait this time for required batch size will be accumulated in stream 

10 in seconds (default is `5.0`). 

11 batch (bool): Whether to propagate consuming batch as iterable object to your handler (default is `False`). 

12 """ 

13 

14 __slots__ = ( 

15 "batch", 

16 "batch_size", 

17 "timeout", 

18 ) 

19 

20 def __init__( 

21 self, 

22 batch_size: int = 1, 

23 timeout: float | None = 5.0, 

24 batch: bool = False, 

25 ) -> None: 

26 self.batch_size = batch_size 

27 self.batch = batch 

28 self.timeout = timeout 

29 

30 @overload 

31 @classmethod 

32 def validate(cls, value: Literal[True]) -> "PullSub": ... 

33 

34 @overload 

35 @classmethod 

36 def validate(cls, value: Literal[False]) -> None: ... 

37 

38 @overload 

39 @classmethod 

40 def validate(cls, value: "PullSub") -> "PullSub": ... 

41 

42 @overload 

43 @classmethod 

44 def validate(cls, value: Union[bool, "PullSub"]) -> Optional["PullSub"]: ... 

45 

46 @classmethod 

47 def validate(cls, value: Union[bool, "PullSub"]) -> Optional["PullSub"]: 

48 if value is True: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true

49 return PullSub() 

50 if value is False: 

51 return None 

52 return value