Coverage for faststream / redis / schemas / list_sub.py: 93%

14 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from copy import deepcopy 

2from functools import cached_property 

3 

4from faststream._internal.proto import NameRequired 

5 

6 

7class ListSub(NameRequired): 

8 """A class to represent a Redis List subscriber.""" 

9 

10 __slots__ = ( 

11 "batch", 

12 "max_records", 

13 "name", 

14 "polling_interval", 

15 ) 

16 

17 def __init__( 

18 self, 

19 list_name: str, 

20 batch: bool = False, 

21 max_records: int = 10, 

22 polling_interval: float = 0.1, 

23 ) -> None: 

24 super().__init__(list_name) 

25 

26 self.batch = batch 

27 self.max_records = max_records 

28 self.polling_interval = polling_interval 

29 

30 @cached_property 

31 def records(self) -> int | None: 

32 return self.max_records if self.batch else None 

33 

34 def add_prefix(self, prefix: str) -> "ListSub": 

35 new_list = deepcopy(self) 

36 new_list.name = f"{prefix}{new_list.name}" 

37 return new_list