Coverage for faststream / _internal / broker / broker.py: 98%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from collections.abc import Iterable, Sequence
3from typing import TYPE_CHECKING, Any, Generic, Optional
5from fast_depends import Provider
6from typing_extensions import Self
8from faststream._internal.configs import BrokerConfigType
9from faststream._internal.types import (
10 BrokerMiddleware,
11 ConnectionType,
12 MsgType,
13)
15from .pub_base import BrokerPublishMixin
16from .registrator import Registrator
18if TYPE_CHECKING:
19 from types import TracebackType
21 from faststream._internal.context.repository import ContextRepo
22 from faststream._internal.di import FastDependsConfig
23 from faststream._internal.producer import ProducerProto
24 from faststream.specification.schema import BrokerSpec
27class BrokerUsecase(
28 Registrator[MsgType, BrokerConfigType],
29 BrokerPublishMixin[MsgType],
30 Generic[MsgType, ConnectionType, BrokerConfigType],
31):
32 """Basic class for brokers-only.
34 Extends `Registrator` by connection, publish and AsyncAPI behavior.
35 """
37 _connection: ConnectionType | None
39 def __init__(
40 self,
41 *,
42 config: BrokerConfigType,
43 specification: "BrokerSpec",
44 routers: Iterable[Registrator[Any, Any]],
45 **connection_kwargs: Any,
46 ) -> None:
47 super().__init__(
48 routers=routers,
49 config=config,
50 )
51 self.specification = specification
53 self.running = False
55 self._connection_kwargs = connection_kwargs
56 self._connection = None
58 @property
59 def middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]:
60 return self.config.broker_middlewares
62 @property
63 def _producer(self) -> "ProducerProto":
64 return self.config.producer
66 @property
67 def context(self) -> "ContextRepo":
68 return self.config.fd_config.context
70 @property
71 def provider(self) -> Provider:
72 return self.config.fd_config.provider
74 async def __aenter__(self) -> "Self":
75 await self.connect()
76 return self
78 async def __aexit__(
79 self,
80 exc_type: type[BaseException] | None,
81 exc_val: BaseException | None,
82 exc_tb: Optional["TracebackType"],
83 ) -> None:
84 await self.stop(exc_type, exc_val, exc_tb)
86 def _update_fd_config(self, config: "FastDependsConfig") -> None:
87 """Private method to change broker config state by outer application."""
88 self.config.fd_config = config | self.config.fd_config
90 async def start(self) -> None:
91 # TODO: filter by already running handlers after TestClient refactor
92 for sub in self.subscribers:
93 await sub.start()
95 for pub in self.publishers:
96 await pub.start()
98 self.running = True
100 def _setup_logger(self) -> None:
101 for sub in self.subscribers:
102 log_context = sub.get_log_context(None)
103 log_context.pop("message_id", None)
104 self.config.logger.params_storage.register_subscriber(log_context)
106 self.config.logger._setup(self.config.fd_config.context)
108 async def connect(self) -> ConnectionType:
109 """Connect to a remote server."""
110 if self._connection is None:
111 self._connection = await self._connect()
112 self._setup_logger()
114 return self._connection
116 @abstractmethod
117 async def _connect(self) -> ConnectionType:
118 raise NotImplementedError
120 async def stop(
121 self,
122 exc_type: type[BaseException] | None = None,
123 exc_val: BaseException | None = None,
124 exc_tb: Optional["TracebackType"] = None,
125 ) -> None:
126 """Closes the object."""
127 for sub in self.subscribers:
128 await sub.stop()
130 self.running = False
132 @abstractmethod
133 async def ping(self, timeout: float | None) -> bool:
134 """Check connection alive."""
135 raise NotImplementedError