Coverage for faststream / _internal / configs / broker.py: 98%
97 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable, Sequence
2from dataclasses import dataclass, field
3from typing import TYPE_CHECKING, Any, Generic, Optional, Union
5from typing_extensions import TypeVar as TypeVar313
7from faststream._internal.constants import EMPTY
8from faststream._internal.di import FastDependsConfig
9from faststream._internal.logger import LoggerState
10from faststream._internal.producer import ProducerProto, ProducerUnset
12if TYPE_CHECKING:
13 from fast_depends.dependencies import Dependant
15 from faststream._internal.parser import CodecProto
16 from faststream._internal.types import BrokerMiddleware, CustomCallable
17 from faststream.middlewares import AckPolicy
20@dataclass(kw_only=True)
21class BrokerConfig:
22 prefix: str = ""
23 include_in_schema: bool | None = True
25 broker_middlewares: Sequence["BrokerMiddleware[Any]"] = ()
26 broker_parser: Optional["CustomCallable"] = None
27 broker_decoder: Optional["CustomCallable"] = None
28 broker_codec: Optional["CodecProto"] = None
30 producer: "ProducerProto[Any]" = field(default_factory=ProducerUnset)
31 logger: "LoggerState" = field(default_factory=LoggerState)
32 fd_config: "FastDependsConfig" = field(default_factory=FastDependsConfig)
34 # subscriber options
35 broker_dependencies: Iterable["Dependant"] = ()
36 graceful_timeout: float | None = None
37 ack_policy: "AckPolicy" = field(default_factory=lambda: EMPTY)
38 extra_context: dict[str, Any] = field(default_factory=dict)
40 def __repr__(self) -> str:
41 return f"{self.__class__.__name__}(id: {id(self)})"
43 def __bool__(self) -> bool:
44 return bool(
45 self.include_in_schema is not None
46 or self.broker_middlewares
47 or self.broker_dependencies
48 or self.prefix,
49 )
51 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
52 self.broker_middlewares = (*self.broker_middlewares, middleware)
54 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
55 self.broker_middlewares = (middleware, *self.broker_middlewares)
58BrokerConfigType = TypeVar313(
59 "BrokerConfigType",
60 bound=BrokerConfig,
61 default=BrokerConfig,
62)
64ConfigType = Union["ConfigComposition[Any]", "BrokerConfigType", BrokerConfig]
67class ConfigComposition(Generic[BrokerConfigType]): # noqa: PLR0904
68 def __init__(self, config: BrokerConfigType) -> None:
69 self.configs: tuple[ConfigType, ...] = (config,)
71 @property
72 def broker_config(self) -> "BrokerConfigType":
73 assert self.configs
74 return self.configs[0] # type: ignore[return-value]
76 def __repr__(self) -> str:
77 return f"{self.__class__.__name__}({', '.join(repr(c) for c in self.configs)})"
79 def add_config(self, config: "ConfigType") -> None:
80 self.configs = (config, *self.configs)
82 def reset(self) -> None:
83 self.configs = (self.configs[-1],)
85 # broker priority options
86 @property
87 def producer(self) -> "ProducerProto[Any]":
88 return self.broker_config.producer
90 @property
91 def logger(self) -> "LoggerState":
92 return self.broker_config.logger
94 @property
95 def fd_config(self) -> "FastDependsConfig":
96 return self.broker_config.fd_config
98 @fd_config.setter
99 def fd_config(self, value: "FastDependsConfig") -> None:
100 self.broker_config.fd_config = value
102 @property
103 def graceful_timeout(self) -> float | None:
104 return self.broker_config.graceful_timeout
106 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
107 self.broker_config.add_middleware(middleware)
109 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None:
110 self.broker_config.insert_middleware(middleware)
112 def __getattr__(self, name: str) -> Any:
113 return getattr(self.broker_config, name)
115 # first valuable option
116 @property
117 def broker_parser(self) -> Optional["CustomCallable"]:
118 for c in self.configs:
119 if c.broker_parser:
120 return c.broker_parser
121 return None
123 @property
124 def broker_decoder(self) -> Optional["CustomCallable"]:
125 for c in self.configs:
126 if c.broker_decoder:
127 return c.broker_decoder
128 return None
130 @property
131 def broker_codec(self) -> Optional["CodecProto"]:
132 for c in self.configs:
133 if c.broker_codec:
134 return c.broker_codec
135 return None
137 @property
138 def ack_policy(self) -> "AckPolicy":
139 for c in reversed(self.configs):
140 ack = c.ack_policy
141 if ack is not EMPTY:
142 return ack
143 return EMPTY # type: ignore[no-any-return]
145 # merged options
146 @property
147 def extra_context(self) -> dict[str, Any]:
148 context: dict[str, Any] = {}
149 for c in self.configs:
150 context |= c.extra_context
151 return context
153 @property
154 def prefix(self) -> str:
155 return "".join(c.prefix for c in self.configs)
157 @property
158 def include_in_schema(self) -> bool:
159 return all(c.include_in_schema is not False for c in self.configs)
161 @property
162 def broker_middlewares(self) -> Sequence["BrokerMiddleware[Any]"]:
163 return [m for c in self.configs for m in c.broker_middlewares]
165 @property
166 def broker_dependencies(self) -> Iterable["Dependant"]:
167 return (b for c in self.configs for b in c.broker_dependencies)