Coverage for faststream / nats / helpers / stream_builder.py: 76%
21 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Optional, Union
3from faststream.nats.schemas import JStream, SubjectsCollection
6class StreamBuilder:
7 """A class to register stream-subjects pairs in Broker/Router."""
9 __slots__ = ("objects",)
11 def __init__(self) -> None:
12 # stores stream: SubjectsCollection pairs
13 # where SubjectsCollection contains subjects
14 # made by current builder only
15 self.objects: dict[str, tuple[JStream, SubjectsCollection]] = {}
17 def __contains__(self, value: Union["JStream", str, None], /) -> bool:
18 if stream := JStream.validate(value):
19 return stream.name in self.objects
20 return False
22 def create(
23 self,
24 name: Union[str, "JStream", None],
25 ) -> Optional["JStream"]:
26 """Get an object."""
27 if (stream := JStream.validate(name)) and (stream.name not in self.objects):
28 self.objects[stream.name] = (stream, stream.subjects.copy())
29 return stream
31 def get(
32 self,
33 stream: Union["JStream", str, None],
34 default: tuple["JStream", "SubjectsCollection"] | None = None,
35 ) -> tuple["JStream", "SubjectsCollection"] | None:
36 if stream := JStream.validate(stream): 36 ↛ 38line 36 didn't jump to line 38 because the condition on line 36 was always true
37 return self.objects.get(stream.name, default)
38 return default
40 def add_subject(
41 self,
42 stream: Union["JStream", str, None],
43 subject: str,
44 ) -> None:
45 if (stream := JStream.validate(stream)) and subject:
46 stream, subjects = self.objects.get(
47 stream.name,
48 (stream, stream.subjects.copy()),
49 )
50 subjects.append(subject)
51 self.objects[stream.name] = (stream, subjects)