Coverage for faststream / nats / helpers / stream_builder.py: 76%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Optional, Union 

2 

3from faststream.nats.schemas import JStream, SubjectsCollection 

4 

5 

6class StreamBuilder: 

7 """A class to register stream-subjects pairs in Broker/Router.""" 

8 

9 __slots__ = ("objects",) 

10 

11 def __init__(self) -> None: 

12 # stores stream: SubjectsCollection pairs 

13 # where SubjectsCollection contains subjects 

14 # made by current builder only 

15 self.objects: dict[str, tuple[JStream, SubjectsCollection]] = {} 

16 

17 def __contains__(self, value: Union["JStream", str, None], /) -> bool: 

18 if stream := JStream.validate(value): 

19 return stream.name in self.objects 

20 return False 

21 

22 def create( 

23 self, 

24 name: Union[str, "JStream", None], 

25 ) -> Optional["JStream"]: 

26 """Get an object.""" 

27 if (stream := JStream.validate(name)) and (stream.name not in self.objects): 

28 self.objects[stream.name] = (stream, stream.subjects.copy()) 

29 return stream 

30 

31 def get( 

32 self, 

33 stream: Union["JStream", str, None], 

34 default: tuple["JStream", "SubjectsCollection"] | None = None, 

35 ) -> tuple["JStream", "SubjectsCollection"] | None: 

36 if stream := JStream.validate(stream): 36 ↛ 38line 36 didn't jump to line 38 because the condition on line 36 was always true

37 return self.objects.get(stream.name, default) 

38 return default 

39 

40 def add_subject( 

41 self, 

42 stream: Union["JStream", str, None], 

43 subject: str, 

44 ) -> None: 

45 if (stream := JStream.validate(stream)) and subject: 

46 stream, subjects = self.objects.get( 

47 stream.name, 

48 (stream, stream.subjects.copy()), 

49 ) 

50 subjects.append(subject) 

51 self.objects[stream.name] = (stream, subjects)