Coverage for faststream / _internal / endpoint / subscriber / call_item.py: 91%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections import UserList 

2from collections.abc import Iterable, Reversible 

3from functools import partial 

4from inspect import unwrap 

5from typing import ( 

6 TYPE_CHECKING, 

7 Any, 

8 Generic, 

9 Optional, 

10 cast, 

11) 

12 

13from faststream._internal.types import MsgType 

14from faststream.exceptions import IgnoredException, SetupError 

15from faststream.specification.asyncapi.utils import to_camelcase 

16 

17if TYPE_CHECKING: 

18 from fast_depends.dependencies import Dependant 

19 

20 from faststream._internal.basic_types import AsyncFuncAny, Decorator 

21 from faststream._internal.di import FastDependsConfig 

22 from faststream._internal.endpoint.call_wrapper import HandlerCallWrapper 

23 from faststream._internal.types import ( 

24 AsyncCallable, 

25 AsyncFilter, 

26 CustomCallable, 

27 SubscriberMiddleware, 

28 ) 

29 from faststream.message import StreamMessage 

30 

31 

32class HandlerItem(Generic[MsgType]): 

33 """A class representing handler overloaded item.""" 

34 

35 __slots__ = ( 

36 "dependant", 

37 "dependencies", 

38 "filter", 

39 "handler", 

40 "item_decoder", 

41 "item_parser", 

42 ) 

43 

44 dependant: Any | None 

45 

46 def __init__( 

47 self, 

48 *, 

49 handler: "HandlerCallWrapper[..., Any]", 

50 filter: "AsyncFilter[Any]", 

51 item_parser: Optional["CustomCallable"], 

52 item_decoder: Optional["CustomCallable"], 

53 dependencies: Iterable["Dependant"], 

54 ) -> None: 

55 self.handler = handler 

56 self.filter = filter 

57 self.item_parser = item_parser 

58 self.item_decoder = item_decoder 

59 self.dependencies = dependencies 

60 self.dependant = None 

61 

62 def __repr__(self) -> str: 

63 filter_call = unwrap(self.filter) 

64 filter_name = getattr(filter_call, "__name__", str(filter_call)) 

65 return f"<'{self.name}': filter='{filter_name}'>" 

66 

67 def _setup( 

68 self, 

69 *, 

70 parser: "AsyncCallable", 

71 decoder: "AsyncCallable", 

72 config: "FastDependsConfig", 

73 broker_dependencies: Iterable["Dependant"], 

74 _call_decorators: Reversible["Decorator"], 

75 ) -> None: 

76 if self.dependant is None: 

77 self.item_parser = parser 

78 self.item_decoder = decoder 

79 

80 self.dependant = self.handler.set_wrapped( 

81 dependencies=(*broker_dependencies, *self.dependencies), 

82 _call_decorators=_call_decorators, 

83 config=config, 

84 ) 

85 

86 @property 

87 def name(self) -> str: 

88 """Returns the name of the original call.""" 

89 if self.handler is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 return "" 

91 

92 caller = unwrap(self.handler._original_call) 

93 return getattr(caller, "__name__", str(caller)) 

94 

95 @property 

96 def description(self) -> str | None: 

97 """Returns the description of original call.""" 

98 if self.handler is None: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 return None 

100 

101 caller = unwrap(self.handler._original_call) 

102 return getattr(caller, "__doc__", None) 

103 

104 async def is_suitable( 

105 self, 

106 msg: MsgType, 

107 cache: dict[Any, Any], 

108 ) -> Optional["StreamMessage[MsgType]"]: 

109 """Check is message suite for current filter.""" 

110 if not (parser := cast("AsyncCallable | None", self.item_parser)) or not ( 110 ↛ 113line 110 didn't jump to line 113 because the condition on line 110 was never true

111 decoder := cast("AsyncCallable | None", self.item_decoder) 

112 ): 

113 error_msg = "You should setup `HandlerItem` at first." 

114 raise SetupError(error_msg) 

115 

116 message = cache[parser] = cast( 

117 "StreamMessage[MsgType]", 

118 cache.get(parser) or await parser(msg), 

119 ) 

120 

121 # NOTE: final decoder will be set for success filter 

122 message.set_decoder(decoder) 

123 

124 if await self.filter(message): 

125 return message 

126 

127 return None 

128 

129 async def call( 

130 self, 

131 /, 

132 message: "StreamMessage[MsgType]", 

133 _extra_middlewares: Iterable["SubscriberMiddleware[Any]"], 

134 ) -> Any: 

135 """Execute wrapped handler with consume middlewares.""" 

136 call: AsyncFuncAny = self.handler.call_wrapped 

137 

138 for middleware in _extra_middlewares: 

139 call = partial(middleware, call) 

140 

141 try: 

142 result = await call(message) 

143 

144 except (IgnoredException, SystemExit): 

145 self.handler.trigger() 

146 raise 

147 

148 except Exception as e: 

149 self.handler.trigger(error=e) 

150 raise 

151 

152 else: 

153 self.handler.trigger(result=result) 

154 return result 

155 

156 

157class CallsCollection(UserList[HandlerItem[MsgType]]): 

158 def add_call(self, call: "HandlerItem[MsgType]") -> None: 

159 self.data.append(call) 

160 

161 @property 

162 def name(self) -> str | None: 

163 """Returns the name of the handler call.""" 

164 if not self.data: 

165 return None 

166 

167 if len(self.data) == 1: 

168 return to_camelcase(self.data[0].name) 

169 

170 return f"[{','.join(to_camelcase(c.name) for c in self.data)}]" 

171 

172 @property 

173 def description(self) -> str | None: 

174 if not self.data: 

175 return None 

176 

177 if len(self.data) == 1: 

178 return self.data[0].description 

179 

180 return "\n".join( 

181 f"{to_camelcase(h.name)}: {h.description or ''}" for h in self.data 

182 )