Coverage for faststream / _internal / configs / broker.py: 98%

97 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable, Sequence 

2from dataclasses import dataclass, field 

3from typing import TYPE_CHECKING, Any, Generic, Optional, Union 

4 

5from typing_extensions import TypeVar as TypeVar313 

6 

7from faststream._internal.constants import EMPTY 

8from faststream._internal.di import FastDependsConfig 

9from faststream._internal.logger import LoggerState 

10from faststream._internal.producer import ProducerProto, ProducerUnset 

11 

12if TYPE_CHECKING: 

13 from fast_depends.dependencies import Dependant 

14 

15 from faststream._internal.parser import CodecProto 

16 from faststream._internal.types import BrokerMiddleware, CustomCallable 

17 from faststream.middlewares import AckPolicy 

18 

19 

20@dataclass(kw_only=True) 

21class BrokerConfig: 

22 prefix: str = "" 

23 include_in_schema: bool | None = True 

24 

25 broker_middlewares: Sequence["BrokerMiddleware[Any]"] = () 

26 broker_parser: Optional["CustomCallable"] = None 

27 broker_decoder: Optional["CustomCallable"] = None 

28 broker_codec: Optional["CodecProto"] = None 

29 

30 producer: "ProducerProto[Any]" = field(default_factory=ProducerUnset) 

31 logger: "LoggerState" = field(default_factory=LoggerState) 

32 fd_config: "FastDependsConfig" = field(default_factory=FastDependsConfig) 

33 

34 # subscriber options 

35 broker_dependencies: Iterable["Dependant"] = () 

36 graceful_timeout: float | None = None 

37 ack_policy: "AckPolicy" = field(default_factory=lambda: EMPTY) 

38 extra_context: dict[str, Any] = field(default_factory=dict) 

39 

40 def __repr__(self) -> str: 

41 return f"{self.__class__.__name__}(id: {id(self)})" 

42 

43 def __bool__(self) -> bool: 

44 return bool( 

45 self.include_in_schema is not None 

46 or self.broker_middlewares 

47 or self.broker_dependencies 

48 or self.prefix, 

49 ) 

50 

51 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

52 self.broker_middlewares = (*self.broker_middlewares, middleware) 

53 

54 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

55 self.broker_middlewares = (middleware, *self.broker_middlewares) 

56 

57 

58BrokerConfigType = TypeVar313( 

59 "BrokerConfigType", 

60 bound=BrokerConfig, 

61 default=BrokerConfig, 

62) 

63 

64ConfigType = Union["ConfigComposition[Any]", "BrokerConfigType", BrokerConfig] 

65 

66 

67class ConfigComposition(Generic[BrokerConfigType]): # noqa: PLR0904 

68 def __init__(self, config: BrokerConfigType) -> None: 

69 self.configs: tuple[ConfigType, ...] = (config,) 

70 

71 @property 

72 def broker_config(self) -> "BrokerConfigType": 

73 assert self.configs 

74 return self.configs[0] # type: ignore[return-value] 

75 

76 def __repr__(self) -> str: 

77 return f"{self.__class__.__name__}({', '.join(repr(c) for c in self.configs)})" 

78 

79 def add_config(self, config: "ConfigType") -> None: 

80 self.configs = (config, *self.configs) 

81 

82 def reset(self) -> None: 

83 self.configs = (self.configs[-1],) 

84 

85 # broker priority options 

86 @property 

87 def producer(self) -> "ProducerProto[Any]": 

88 return self.broker_config.producer 

89 

90 @property 

91 def logger(self) -> "LoggerState": 

92 return self.broker_config.logger 

93 

94 @property 

95 def fd_config(self) -> "FastDependsConfig": 

96 return self.broker_config.fd_config 

97 

98 @fd_config.setter 

99 def fd_config(self, value: "FastDependsConfig") -> None: 

100 self.broker_config.fd_config = value 

101 

102 @property 

103 def graceful_timeout(self) -> float | None: 

104 return self.broker_config.graceful_timeout 

105 

106 def add_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

107 self.broker_config.add_middleware(middleware) 

108 

109 def insert_middleware(self, middleware: "BrokerMiddleware[Any]") -> None: 

110 self.broker_config.insert_middleware(middleware) 

111 

112 def __getattr__(self, name: str) -> Any: 

113 return getattr(self.broker_config, name) 

114 

115 # first valuable option 

116 @property 

117 def broker_parser(self) -> Optional["CustomCallable"]: 

118 for c in self.configs: 

119 if c.broker_parser: 

120 return c.broker_parser 

121 return None 

122 

123 @property 

124 def broker_decoder(self) -> Optional["CustomCallable"]: 

125 for c in self.configs: 

126 if c.broker_decoder: 

127 return c.broker_decoder 

128 return None 

129 

130 @property 

131 def broker_codec(self) -> Optional["CodecProto"]: 

132 for c in self.configs: 

133 if c.broker_codec: 

134 return c.broker_codec 

135 return None 

136 

137 @property 

138 def ack_policy(self) -> "AckPolicy": 

139 for c in reversed(self.configs): 

140 ack = c.ack_policy 

141 if ack is not EMPTY: 

142 return ack 

143 return EMPTY # type: ignore[no-any-return] 

144 

145 # merged options 

146 @property 

147 def extra_context(self) -> dict[str, Any]: 

148 context: dict[str, Any] = {} 

149 for c in self.configs: 

150 context |= c.extra_context 

151 return context 

152 

153 @property 

154 def prefix(self) -> str: 

155 return "".join(c.prefix for c in self.configs) 

156 

157 @property 

158 def include_in_schema(self) -> bool: 

159 return all(c.include_in_schema is not False for c in self.configs) 

160 

161 @property 

162 def broker_middlewares(self) -> Sequence["BrokerMiddleware[Any]"]: 

163 return [m for c in self.configs for m in c.broker_middlewares] 

164 

165 @property 

166 def broker_dependencies(self) -> Iterable["Dependant"]: 

167 return (b for c in self.configs for b in c.broker_dependencies)