Coverage for faststream / redis / schemas / list_sub.py: 93%
14 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from copy import deepcopy
2from functools import cached_property
4from faststream._internal.proto import NameRequired
7class ListSub(NameRequired):
8 """A class to represent a Redis List subscriber."""
10 __slots__ = (
11 "batch",
12 "max_records",
13 "name",
14 "polling_interval",
15 )
17 def __init__(
18 self,
19 list_name: str,
20 batch: bool = False,
21 max_records: int = 10,
22 polling_interval: float = 0.1,
23 ) -> None:
24 super().__init__(list_name)
26 self.batch = batch
27 self.max_records = max_records
28 self.polling_interval = polling_interval
30 @cached_property
31 def records(self) -> int | None:
32 return self.max_records if self.batch else None
34 def add_prefix(self, prefix: str) -> "ListSub":
35 new_list = deepcopy(self)
36 new_list.name = f"{prefix}{new_list.name}"
37 return new_list