Coverage for faststream / _internal / broker / registrator.py: 98%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Iterable, Sequence 

3from typing import TYPE_CHECKING, Any, Generic 

4from weakref import WeakSet 

5 

6from faststream._internal.configs import BrokerConfig, BrokerConfigType, ConfigComposition 

7from faststream._internal.types import BrokerMiddleware, MsgType 

8 

9if TYPE_CHECKING: 

10 from fast_depends.dependencies import Dependant 

11 

12 from faststream._internal.endpoint.publisher import PublisherUsecase 

13 from faststream._internal.endpoint.subscriber import SubscriberUsecase 

14 

15 

16class Registrator(Generic[MsgType, BrokerConfigType]): 

17 """Basic class for brokers and routers. 

18 

19 Contains subscribers & publishers registration logic only. 

20 """ 

21 

22 def __init__( 

23 self, 

24 *, 

25 config: BrokerConfigType, 

26 routers: Iterable["Registrator[MsgType]"], 

27 ) -> None: 

28 self._parser = config.broker_parser 

29 self._decoder = config.broker_decoder 

30 

31 self.config: ConfigComposition[BrokerConfigType] = ConfigComposition(config) 

32 

33 self._subscribers: WeakSet[SubscriberUsecase[MsgType]] = WeakSet() 

34 self._publishers: WeakSet[PublisherUsecase] = WeakSet() 

35 self.routers: list[Registrator[MsgType, Any]] = [] 

36 

37 self.__persistent_subscribers: list[SubscriberUsecase[MsgType]] = [] 

38 self.__persistent_publishers: list[PublisherUsecase] = [] 

39 

40 self.__parent: Registrator[MsgType, Any] | None = None 

41 

42 self.include_routers(*routers) 

43 

44 @property 

45 def subscribers(self) -> list["SubscriberUsecase[MsgType]"]: 

46 return [*self._subscribers, *(sub for r in self.routers for sub in r.subscribers)] 

47 

48 @property 

49 def publishers(self) -> list["PublisherUsecase"]: 

50 return [*self._publishers, *(pub for r in self.routers for pub in r.publishers)] 

51 

52 def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None: 

53 """Append BrokerMiddleware to the end of middlewares list. 

54 

55 Current middleware will be used as a most inner of the stack. 

56 """ 

57 self.config.add_middleware(middleware) 

58 

59 def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None: 

60 """Insert BrokerMiddleware to the start of middlewares list. 

61 

62 Current middleware will be used as a most outer of the stack. 

63 """ 

64 self.config.insert_middleware(middleware) 

65 

66 @abstractmethod 

67 def subscriber( 

68 self, 

69 subscriber: "SubscriberUsecase[MsgType]", 

70 persistent: bool = True, 

71 ) -> "SubscriberUsecase[MsgType]": 

72 self._subscribers.add(subscriber) 

73 if persistent: 

74 self.__persistent_subscribers.append(subscriber) 

75 return subscriber 

76 

77 @abstractmethod 

78 def publisher( 

79 self, 

80 publisher: "PublisherUsecase", 

81 persistent: bool = True, 

82 ) -> "PublisherUsecase": 

83 self._publishers.add(publisher) 

84 if persistent: 

85 self.__persistent_publishers.append(publisher) 

86 return publisher 

87 

88 def include_router( 

89 self, 

90 router: "Registrator[MsgType, Any]", 

91 *, 

92 prefix: str = "", 

93 dependencies: Iterable["Dependant"] = (), 

94 middlewares: Sequence["BrokerMiddleware[MsgType]"] = (), 

95 include_in_schema: bool | None = None, 

96 ) -> None: 

97 """Includes a router in the current object.""" 

98 if router.parent is self: 

99 return 

100 router.parent = self 

101 

102 if options_config := BrokerConfig( 

103 prefix=prefix, 

104 include_in_schema=include_in_schema, 

105 broker_middlewares=middlewares, 

106 broker_dependencies=dependencies, 

107 ): 

108 router.config.add_config(options_config) 

109 

110 router.config.add_config(self.config) 

111 self.routers.append(router) 

112 

113 @property 

114 def parent(self) -> "Registrator[MsgType, Any] | None": 

115 return self.__parent 

116 

117 @parent.setter 

118 def parent(self, parent: "Registrator[MsgType, Any]") -> None: 

119 if self.__parent is not None and parent is not self.__parent: 

120 self.__parent.routers.remove(self) 

121 self.config.reset() 

122 self.__parent = parent 

123 

124 def include_routers( 

125 self, 

126 *routers: "Registrator[MsgType, Any]", 

127 ) -> None: 

128 """Includes routers in the object.""" 

129 for r in routers: 

130 self.include_router(r)