Coverage for faststream / _internal / fastapi / route.py: 94%
68 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import inspect
3from collections.abc import Awaitable, Callable, Iterable
4from contextlib import AsyncExitStack
5from functools import wraps
6from itertools import dropwhile
7from typing import (
8 TYPE_CHECKING,
9 Any,
10 Optional,
11 Union,
12)
14from fast_depends.dependencies import Dependant
15from fastapi.routing import run_endpoint_function, serialize_response
16from starlette.requests import Request
18from faststream._internal.context import Context, ContextRepo
19from faststream._internal.types import P_HandlerParams, T_HandlerReturn
20from faststream.exceptions import SetupError
21from faststream.response import Response, ensure_response
23from ._compat import (
24 FASTAPI_V106,
25 FASTAPI_V121,
26 create_response_field,
27 raise_fastapi_validation_error,
28 solve_faststream_dependency,
29)
30from .get_dependant import (
31 get_fastapi_native_dependant,
32 has_forbidden_types,
33 is_faststream_decorated,
34 mark_faststream_decorated,
35)
37if TYPE_CHECKING:
38 from fastapi import params
39 from fastapi._compat import ModelField
40 from fastapi.dependencies.models import Dependant as FastAPIDependant
41 from fastapi.types import IncEx
43 from faststream.message import StreamMessage as NativeMessage
45 from .config import FastAPIConfig
48class StreamMessage(Request):
49 """A class to represent a stream message."""
51 scope: "dict[str, Any]"
52 _cookies: "dict[str, Any]"
53 _headers: "dict[str, Any]" # type: ignore[assignment]
54 _body: Union["dict[str, Any]", list[Any]] # type: ignore[assignment]
55 _query_params: "dict[str, Any]" # type: ignore[assignment]
57 def __init__(
58 self,
59 *,
60 body: Union["dict[str, Any]", list[Any]],
61 headers: "dict[str, Any]",
62 path: "dict[str, Any]",
63 ) -> None:
64 """Initialize a class instance."""
65 self._headers = headers
66 self._body = body
67 self._query_params = path
69 self.scope = {"path_params": self._query_params}
70 self._cookies = {}
73def wrap_callable_to_fastapi_compatible(
74 user_callable: Callable[P_HandlerParams, T_HandlerReturn],
75 *,
76 fastapi_config: "FastAPIConfig",
77 dependencies: Iterable["params.Depends"],
78 response_model: Any,
79 response_model_include: Optional["IncEx"],
80 response_model_exclude: Optional["IncEx"],
81 response_model_by_alias: bool,
82 response_model_exclude_unset: bool,
83 response_model_exclude_defaults: bool,
84 response_model_exclude_none: bool,
85 context: "ContextRepo",
86) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
87 if has_forbidden_types(user_callable, (Dependant,)):
88 msg = (
89 f"Incorrect `faststream.Depends` usage at `{user_callable.__name__}`. "
90 "For FastAPI integration use `fastapi.Depends` instead."
91 )
92 raise SetupError(msg)
94 if has_forbidden_types(user_callable, (Context,)):
95 msg = (
96 f"Incorrect `faststream.Context` usage at `{user_callable.__name__}`. "
97 "For FastAPI integration use `faststream.[broker].fastapi.Context` instead."
98 )
99 raise SetupError(msg)
101 if is_faststream_decorated(user_callable):
102 return user_callable # type: ignore[return-value]
104 if response_model: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 response_field = create_response_field(
106 name="ResponseModel",
107 type_=response_model,
108 mode="serialization",
109 )
110 else:
111 response_field = None
113 parsed_callable = build_faststream_to_fastapi_parser(
114 dependent=get_fastapi_native_dependant(
115 user_callable,
116 dependencies=list(dependencies),
117 ),
118 fastapi_config=fastapi_config,
119 response_field=response_field,
120 response_model_include=response_model_include,
121 response_model_exclude=response_model_exclude,
122 response_model_by_alias=response_model_by_alias,
123 response_model_exclude_unset=response_model_exclude_unset,
124 response_model_exclude_defaults=response_model_exclude_defaults,
125 response_model_exclude_none=response_model_exclude_none,
126 context=context,
127 )
129 mark_faststream_decorated(parsed_callable)
130 return wraps(user_callable)(parsed_callable)
133def build_faststream_to_fastapi_parser(
134 *,
135 dependent: "FastAPIDependant",
136 fastapi_config: "FastAPIConfig",
137 context: "ContextRepo",
138 response_field: Optional["ModelField"],
139 response_model_include: Optional["IncEx"],
140 response_model_exclude: Optional["IncEx"],
141 response_model_by_alias: bool,
142 response_model_exclude_unset: bool,
143 response_model_exclude_defaults: bool,
144 response_model_exclude_none: bool,
145) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
146 """Creates a session for handling requests."""
147 assert dependent.call
149 consume = make_fastapi_execution(
150 dependent=dependent,
151 fastapi_config=fastapi_config,
152 response_field=response_field,
153 response_model_include=response_model_include,
154 response_model_exclude=response_model_exclude,
155 response_model_by_alias=response_model_by_alias,
156 response_model_exclude_unset=response_model_exclude_unset,
157 response_model_exclude_defaults=response_model_exclude_defaults,
158 response_model_exclude_none=response_model_exclude_none,
159 )
161 dependencies_names = tuple(i.name for i in dependent.dependencies)
163 first_arg = next(
164 dropwhile(
165 lambda i: i in dependencies_names,
166 inspect.signature(dependent.call).parameters,
167 ),
168 None,
169 )
171 async def parsed_consumer(message: "NativeMessage[Any]") -> Any:
172 """Wrapper, that parser FastStream message to FastAPI compatible one."""
173 body = await message.decode()
175 fastapi_body: dict[str, Any] | list[Any]
176 if first_arg is not None:
177 if isinstance(body, dict):
178 path = fastapi_body = body or {}
179 elif isinstance(body, list):
180 fastapi_body, path = body, {}
181 else:
182 path = fastapi_body = {first_arg: body}
184 stream_message = StreamMessage(
185 body=fastapi_body,
186 headers={"context__": context, **message.headers},
187 path={**path, **message.path},
188 )
190 else:
191 stream_message = StreamMessage(
192 body={},
193 headers={"context__": context},
194 path={},
195 )
197 return await consume(stream_message, message)
199 return parsed_consumer
202def make_fastapi_execution(
203 *,
204 dependent: "FastAPIDependant",
205 fastapi_config: "FastAPIConfig",
206 response_field: Optional["ModelField"],
207 response_model_include: Optional["IncEx"],
208 response_model_exclude: Optional["IncEx"],
209 response_model_by_alias: bool,
210 response_model_exclude_unset: bool,
211 response_model_exclude_defaults: bool,
212 response_model_exclude_none: bool,
213) -> Callable[
214 ["StreamMessage", "NativeMessage[Any]"],
215 Awaitable[Response],
216]:
217 """Creates a FastAPI application."""
218 is_coroutine = asyncio.iscoroutinefunction(dependent.call)
220 async def app(
221 request: "StreamMessage",
222 raw_message: "NativeMessage[Any]", # to support BackgroundTasks by middleware
223 ) -> Response:
224 """Consume StreamMessage and return user function result."""
225 async with AsyncExitStack() as stack:
226 kwargs = {}
227 if FASTAPI_V121: 227 ↛ 233line 227 didn't jump to line 233 because the condition on line 227 was always true
228 request.scope["fastapi_inner_astack"] = stack
229 function_stack = AsyncExitStack()
230 await stack.enter_async_context(function_stack)
231 request.scope["fastapi_function_astack"] = function_stack
233 if FASTAPI_V106: 233 ↛ 237line 233 didn't jump to line 237 because the condition on line 233 was always true
234 kwargs = {"async_exit_stack": stack}
236 else:
237 request.scope["fastapi_astack"] = stack
239 request.scope["app"] = fastapi_config.application
241 solved_result = await solve_faststream_dependency(
242 request=request,
243 dependant=dependent,
244 dependency_overrides_provider=fastapi_config.dependency_overrides_provider,
245 **kwargs,
246 )
248 raw_message.background = solved_result.background_tasks # type: ignore[attr-defined]
250 if solved_result.errors:
251 raise_fastapi_validation_error(solved_result.errors, request._body) # type: ignore[arg-type]
253 function_result = await run_endpoint_function(
254 dependant=dependent,
255 values=solved_result.values,
256 is_coroutine=is_coroutine,
257 )
259 response = ensure_response(function_result)
261 response.body = await serialize_response(
262 response_content=response.body,
263 field=response_field,
264 include=response_model_include,
265 exclude=response_model_exclude,
266 by_alias=response_model_by_alias,
267 exclude_unset=response_model_exclude_unset,
268 exclude_defaults=response_model_exclude_defaults,
269 exclude_none=response_model_exclude_none,
270 is_coroutine=is_coroutine,
271 )
273 return response
275 msg = "unreachable"
276 raise AssertionError(msg)
278 return app