Coverage for faststream / _internal / endpoint / subscriber / call_item.py: 91%
58 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections import UserList
2from collections.abc import Iterable, Reversible
3from functools import partial
4from inspect import unwrap
5from typing import (
6 TYPE_CHECKING,
7 Any,
8 Generic,
9 Optional,
10 cast,
11)
13from faststream._internal.types import MsgType
14from faststream.exceptions import IgnoredException, SetupError
15from faststream.specification.asyncapi.utils import to_camelcase
17if TYPE_CHECKING:
18 from fast_depends.dependencies import Dependant
20 from faststream._internal.basic_types import AsyncFuncAny, Decorator
21 from faststream._internal.di import FastDependsConfig
22 from faststream._internal.endpoint.call_wrapper import HandlerCallWrapper
23 from faststream._internal.types import (
24 AsyncCallable,
25 AsyncFilter,
26 CustomCallable,
27 SubscriberMiddleware,
28 )
29 from faststream.message import StreamMessage
32class HandlerItem(Generic[MsgType]):
33 """A class representing handler overloaded item."""
35 __slots__ = (
36 "dependant",
37 "dependencies",
38 "filter",
39 "handler",
40 "item_decoder",
41 "item_parser",
42 )
44 dependant: Any | None
46 def __init__(
47 self,
48 *,
49 handler: "HandlerCallWrapper[..., Any]",
50 filter: "AsyncFilter[Any]",
51 item_parser: Optional["CustomCallable"],
52 item_decoder: Optional["CustomCallable"],
53 dependencies: Iterable["Dependant"],
54 ) -> None:
55 self.handler = handler
56 self.filter = filter
57 self.item_parser = item_parser
58 self.item_decoder = item_decoder
59 self.dependencies = dependencies
60 self.dependant = None
62 def __repr__(self) -> str:
63 filter_call = unwrap(self.filter)
64 filter_name = getattr(filter_call, "__name__", str(filter_call))
65 return f"<'{self.name}': filter='{filter_name}'>"
67 def _setup(
68 self,
69 *,
70 parser: "AsyncCallable",
71 decoder: "AsyncCallable",
72 config: "FastDependsConfig",
73 broker_dependencies: Iterable["Dependant"],
74 _call_decorators: Reversible["Decorator"],
75 ) -> None:
76 if self.dependant is None:
77 self.item_parser = parser
78 self.item_decoder = decoder
80 self.dependant = self.handler.set_wrapped(
81 dependencies=(*broker_dependencies, *self.dependencies),
82 _call_decorators=_call_decorators,
83 config=config,
84 )
86 @property
87 def name(self) -> str:
88 """Returns the name of the original call."""
89 if self.handler is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 return ""
92 caller = unwrap(self.handler._original_call)
93 return getattr(caller, "__name__", str(caller))
95 @property
96 def description(self) -> str | None:
97 """Returns the description of original call."""
98 if self.handler is None: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 return None
101 caller = unwrap(self.handler._original_call)
102 return getattr(caller, "__doc__", None)
104 async def is_suitable(
105 self,
106 msg: MsgType,
107 cache: dict[Any, Any],
108 ) -> Optional["StreamMessage[MsgType]"]:
109 """Check is message suite for current filter."""
110 if not (parser := cast("AsyncCallable | None", self.item_parser)) or not ( 110 ↛ 113line 110 didn't jump to line 113 because the condition on line 110 was never true
111 decoder := cast("AsyncCallable | None", self.item_decoder)
112 ):
113 error_msg = "You should setup `HandlerItem` at first."
114 raise SetupError(error_msg)
116 message = cache[parser] = cast(
117 "StreamMessage[MsgType]",
118 cache.get(parser) or await parser(msg),
119 )
121 # NOTE: final decoder will be set for success filter
122 message.set_decoder(decoder)
124 if await self.filter(message):
125 return message
127 return None
129 async def call(
130 self,
131 /,
132 message: "StreamMessage[MsgType]",
133 _extra_middlewares: Iterable["SubscriberMiddleware[Any]"],
134 ) -> Any:
135 """Execute wrapped handler with consume middlewares."""
136 call: AsyncFuncAny = self.handler.call_wrapped
138 for middleware in _extra_middlewares:
139 call = partial(middleware, call)
141 try:
142 result = await call(message)
144 except (IgnoredException, SystemExit):
145 self.handler.trigger()
146 raise
148 except Exception as e:
149 self.handler.trigger(error=e)
150 raise
152 else:
153 self.handler.trigger(result=result)
154 return result
157class CallsCollection(UserList[HandlerItem[MsgType]]):
158 def add_call(self, call: "HandlerItem[MsgType]") -> None:
159 self.data.append(call)
161 @property
162 def name(self) -> str | None:
163 """Returns the name of the handler call."""
164 if not self.data:
165 return None
167 if len(self.data) == 1:
168 return to_camelcase(self.data[0].name)
170 return f"[{','.join(to_camelcase(c.name) for c in self.data)}]"
172 @property
173 def description(self) -> str | None:
174 if not self.data:
175 return None
177 if len(self.data) == 1:
178 return self.data[0].description
180 return "\n".join(
181 f"{to_camelcase(h.name)}: {h.description or ''}" for h in self.data
182 )