Coverage for faststream / _internal / broker / registrator.py: 98%
42 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from collections.abc import Iterable, Sequence
3from typing import TYPE_CHECKING, Any, Generic
4from weakref import WeakSet
6from faststream._internal.configs import BrokerConfig, BrokerConfigType, ConfigComposition
7from faststream._internal.types import BrokerMiddleware, MsgType
9if TYPE_CHECKING:
10 from fast_depends.dependencies import Dependant
12 from faststream._internal.endpoint.publisher import PublisherUsecase
13 from faststream._internal.endpoint.subscriber import SubscriberUsecase
16class Registrator(Generic[MsgType, BrokerConfigType]):
17 """Basic class for brokers and routers.
19 Contains subscribers & publishers registration logic only.
20 """
22 def __init__(
23 self,
24 *,
25 config: BrokerConfigType,
26 routers: Iterable["Registrator[MsgType]"],
27 ) -> None:
28 self._parser = config.broker_parser
29 self._decoder = config.broker_decoder
31 self.config: ConfigComposition[BrokerConfigType] = ConfigComposition(config)
33 self._subscribers: WeakSet[SubscriberUsecase[MsgType]] = WeakSet()
34 self._publishers: WeakSet[PublisherUsecase] = WeakSet()
35 self.routers: list[Registrator[MsgType, Any]] = []
37 self.__persistent_subscribers: list[SubscriberUsecase[MsgType]] = []
38 self.__persistent_publishers: list[PublisherUsecase] = []
40 self.__parent: Registrator[MsgType, Any] | None = None
42 self.include_routers(*routers)
44 @property
45 def subscribers(self) -> list["SubscriberUsecase[MsgType]"]:
46 return [*self._subscribers, *(sub for r in self.routers for sub in r.subscribers)]
48 @property
49 def publishers(self) -> list["PublisherUsecase"]:
50 return [*self._publishers, *(pub for r in self.routers for pub in r.publishers)]
52 def add_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
53 """Append BrokerMiddleware to the end of middlewares list.
55 Current middleware will be used as a most inner of the stack.
56 """
57 self.config.add_middleware(middleware)
59 def insert_middleware(self, middleware: "BrokerMiddleware[Any, Any]") -> None:
60 """Insert BrokerMiddleware to the start of middlewares list.
62 Current middleware will be used as a most outer of the stack.
63 """
64 self.config.insert_middleware(middleware)
66 @abstractmethod
67 def subscriber(
68 self,
69 subscriber: "SubscriberUsecase[MsgType]",
70 persistent: bool = True,
71 ) -> "SubscriberUsecase[MsgType]":
72 self._subscribers.add(subscriber)
73 if persistent:
74 self.__persistent_subscribers.append(subscriber)
75 return subscriber
77 @abstractmethod
78 def publisher(
79 self,
80 publisher: "PublisherUsecase",
81 persistent: bool = True,
82 ) -> "PublisherUsecase":
83 self._publishers.add(publisher)
84 if persistent:
85 self.__persistent_publishers.append(publisher)
86 return publisher
88 def include_router(
89 self,
90 router: "Registrator[MsgType, Any]",
91 *,
92 prefix: str = "",
93 dependencies: Iterable["Dependant"] = (),
94 middlewares: Sequence["BrokerMiddleware[MsgType]"] = (),
95 include_in_schema: bool | None = None,
96 ) -> None:
97 """Includes a router in the current object."""
98 if router.parent is self:
99 return
100 router.parent = self
102 if options_config := BrokerConfig(
103 prefix=prefix,
104 include_in_schema=include_in_schema,
105 broker_middlewares=middlewares,
106 broker_dependencies=dependencies,
107 ):
108 router.config.add_config(options_config)
110 router.config.add_config(self.config)
111 self.routers.append(router)
113 @property
114 def parent(self) -> "Registrator[MsgType, Any] | None":
115 return self.__parent
117 @parent.setter
118 def parent(self, parent: "Registrator[MsgType, Any]") -> None:
119 if self.__parent is not None and parent is not self.__parent:
120 self.__parent.routers.remove(self)
121 self.config.reset()
122 self.__parent = parent
124 def include_routers(
125 self,
126 *routers: "Registrator[MsgType, Any]",
127 ) -> None:
128 """Includes routers in the object."""
129 for r in routers:
130 self.include_router(r)