Coverage for faststream / _internal / fastapi / router.py: 96%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import json
2import warnings
3from abc import abstractmethod
4from collections.abc import (
5 AsyncIterator,
6 Awaitable,
7 Callable,
8 Iterable,
9 Mapping,
10 Sequence,
11)
12from contextlib import asynccontextmanager
13from enum import Enum
14from typing import (
15 TYPE_CHECKING,
16 Any,
17 Generic,
18 Optional,
19 Union,
20 cast,
21 overload,
22)
24from fastapi.datastructures import Default
25from fastapi.responses import HTMLResponse
26from fastapi.routing import APIRoute, APIRouter
27from fastapi.utils import generate_unique_id
28from starlette.responses import JSONResponse, Response
29from starlette.routing import BaseRoute, _DefaultLifespan
31from faststream._internal.application import StartAbleApplication
32from faststream._internal.broker import BrokerRouter
33from faststream._internal.context import ContextRepo
34from faststream._internal.di.config import FastDependsConfig
35from faststream._internal.types import (
36 MsgType,
37 P_HandlerParams,
38 T_HandlerReturn,
39)
40from faststream._internal.utils.functions import to_async
41from faststream.middlewares import BaseMiddleware
42from faststream.specification.asyncapi.site import get_asyncapi_html
44from .config import FastAPIConfig
45from .get_dependant import get_fastapi_dependant
46from .route import wrap_callable_to_fastapi_compatible
48if TYPE_CHECKING:
49 from types import TracebackType
51 from fastapi import FastAPI, params
52 from fastapi.background import BackgroundTasks
53 from fastapi.types import IncEx
54 from starlette import routing
55 from starlette.types import ASGIApp, AppType, Lifespan
57 from faststream._internal.broker import BrokerUsecase
58 from faststream._internal.endpoint.call_wrapper import HandlerCallWrapper
59 from faststream._internal.endpoint.publisher import PublisherUsecase
60 from faststream._internal.proto import NameRequired
61 from faststream._internal.types import BrokerMiddleware
62 from faststream.message import StreamMessage
63 from faststream.specification.base import SpecificationFactory
64 from faststream.specification.schema import Tag, TagDict
67class _BackgroundMiddleware(BaseMiddleware):
68 async def __aexit__(
69 self,
70 exc_type: type[BaseException] | None = None,
71 exc_val: BaseException | None = None,
72 exc_tb: Optional["TracebackType"] = None,
73 ) -> bool | None:
74 if not exc_type and (
75 background := cast(
76 "BackgroundTasks | None",
77 getattr(self.context.get_local("message"), "background", None),
78 )
79 ):
80 await background()
82 return await super().after_processed(exc_type, exc_val, exc_tb)
85class StreamRouter(APIRouter, StartAbleApplication, Generic[MsgType]):
86 """A class to route streams."""
88 broker_class: type["BrokerUsecase[MsgType, Any]"]
89 broker: "BrokerUsecase[MsgType, Any]"
90 docs_router: APIRouter | None
91 _after_startup_hooks: list[Callable[[Any], Awaitable[Mapping[str, Any] | None]]]
92 _on_shutdown_hooks: list[Callable[[Any], Awaitable[None]]]
94 title: str
95 description: str
96 version: str
97 license: dict[str, Any] | None
98 contact: dict[str, Any] | None
100 def __init__(
101 self,
102 *connection_args: Any,
103 context: ContextRepo | None,
104 middlewares: Sequence["BrokerMiddleware[MsgType]"] = (),
105 prefix: str = "",
106 tags: list[str | Enum] | None = None,
107 dependencies: Sequence["params.Depends"] | None = None,
108 default_response_class: type["Response"] = Default(JSONResponse),
109 responses: dict[int | str, dict[str, Any]] | None = None,
110 callbacks: list["routing.BaseRoute"] | None = None,
111 routes: list["routing.BaseRoute"] | None = None,
112 redirect_slashes: bool = True,
113 default: Optional["ASGIApp"] = None,
114 dependency_overrides_provider: Any | None = None,
115 route_class: type["APIRoute"] = APIRoute,
116 on_startup: Sequence[Callable[[], Any]] | None = None,
117 on_shutdown: Sequence[Callable[[], Any]] | None = None,
118 deprecated: bool | None = None,
119 include_in_schema: bool = True,
120 setup_state: bool = True,
121 lifespan: Optional["Lifespan[Any]"] = None,
122 generate_unique_id_function: Callable[["APIRoute"], str] = Default(
123 generate_unique_id,
124 ),
125 # Specification information
126 specification: Optional["SpecificationFactory"] = None,
127 specification_tags: Iterable[Union["Tag", "TagDict"]] = (),
128 schema_url: str | None = "/asyncapi",
129 **connection_kwars: Any,
130 ) -> None:
131 broker = self.broker_class(
132 *connection_args,
133 middlewares=(
134 *middlewares,
135 # allow to catch background exceptions in user middlewares
136 _BackgroundMiddleware,
137 ),
138 tags=specification_tags,
139 apply_types=False,
140 **connection_kwars,
141 )
143 self._init_setupable_(
144 broker,
145 config=FastDependsConfig(
146 get_dependent=get_fastapi_dependant,
147 context=context or ContextRepo(),
148 ),
149 specification=specification,
150 )
152 self.setup_state = setup_state
154 super().__init__(
155 prefix=prefix,
156 tags=tags,
157 dependencies=dependencies,
158 default_response_class=default_response_class,
159 responses=responses,
160 callbacks=callbacks,
161 routes=routes,
162 redirect_slashes=redirect_slashes,
163 default=default,
164 dependency_overrides_provider=dependency_overrides_provider,
165 route_class=route_class,
166 deprecated=deprecated,
167 include_in_schema=include_in_schema,
168 generate_unique_id_function=generate_unique_id_function,
169 lifespan=self._wrap_lifespan(lifespan),
170 on_startup=on_startup,
171 on_shutdown=on_shutdown,
172 )
174 self.fastapi_config = FastAPIConfig(
175 dependency_overrides_provider=dependency_overrides_provider,
176 )
178 if self.include_in_schema:
179 self.docs_router = self._asyncapi_router(schema_url)
180 else:
181 self.docs_router = None
183 self._after_startup_hooks = []
184 self._on_shutdown_hooks = []
186 self._lifespan_started = False
188 def _subscriber_compatibility_wrapper(
189 self,
190 dependencies: Iterable["params.Depends"] = (),
191 response_model: Any = Default(None),
192 response_model_include: Optional["IncEx"] = None,
193 response_model_exclude: Optional["IncEx"] = None,
194 response_model_by_alias: bool = True,
195 response_model_exclude_unset: bool = False,
196 response_model_exclude_defaults: bool = False,
197 response_model_exclude_none: bool = False,
198 ) -> Callable[
199 [Callable[..., Any]],
200 Callable[["StreamMessage[Any]"], Awaitable[Any]],
201 ]:
202 """Decorator before `broker.subscriber`, that wraps function to FastAPI-compatible one."""
204 def wrapper(
205 endpoint: Callable[..., Any],
206 ) -> Callable[["StreamMessage[Any]"], Awaitable[Any]]:
207 """Patch user function to make it FastAPI-compatible."""
208 return wrap_callable_to_fastapi_compatible(
209 user_callable=endpoint,
210 dependencies=dependencies,
211 response_model=response_model,
212 response_model_include=response_model_include,
213 response_model_exclude=response_model_exclude,
214 response_model_by_alias=response_model_by_alias,
215 response_model_exclude_unset=response_model_exclude_unset,
216 response_model_exclude_defaults=response_model_exclude_defaults,
217 response_model_exclude_none=response_model_exclude_none,
218 context=self.context,
219 fastapi_config=self.fastapi_config,
220 )
222 return wrapper
224 def subscriber(
225 self,
226 *extra: Union["NameRequired", str],
227 dependencies: Iterable["params.Depends"],
228 response_model: Any,
229 response_model_include: Optional["IncEx"],
230 response_model_exclude: Optional["IncEx"],
231 response_model_by_alias: bool,
232 response_model_exclude_unset: bool,
233 response_model_exclude_defaults: bool,
234 response_model_exclude_none: bool,
235 **broker_kwargs: Any,
236 ) -> Callable[
237 [Callable[P_HandlerParams, T_HandlerReturn]],
238 "HandlerCallWrapper[P_HandlerParams, T_HandlerReturn]",
239 ]:
240 """A function decorator for subscribing to a message queue."""
241 dependencies = (*self.dependencies, *dependencies)
243 sub = self.broker.subscriber( # type: ignore[call-arg]
244 *extra, # type: ignore[arg-type]
245 dependencies=dependencies,
246 **broker_kwargs,
247 )
249 sub._call_decorators = (
250 self._subscriber_compatibility_wrapper(
251 dependencies=dependencies,
252 response_model=response_model,
253 response_model_include=response_model_include,
254 response_model_exclude=response_model_exclude,
255 response_model_by_alias=response_model_by_alias,
256 response_model_exclude_unset=response_model_exclude_unset,
257 response_model_exclude_defaults=response_model_exclude_defaults,
258 response_model_exclude_none=response_model_exclude_none,
259 ),
260 *sub._call_decorators,
261 )
263 return sub
265 def _wrap_lifespan(
266 self,
267 lifespan: Optional["Lifespan[Any]"] = None,
268 ) -> "Lifespan[Any]":
269 lifespan_context = lifespan if lifespan is not None else _DefaultLifespan(self)
271 @asynccontextmanager
272 async def start_broker_lifespan(
273 app: "FastAPI",
274 ) -> AsyncIterator[Mapping[str, Any] | None]:
275 """Starts the lifespan of a broker."""
276 self.fastapi_config.set_application(app)
278 if self.docs_router:
279 self.schema.title = app.title
280 self.schema.description = app.description
281 self.schema.version = app.version
282 self.schema.contact = app.contact
283 self.schema.license = app.license_info
284 app.include_router(self.docs_router)
286 async with lifespan_context(app) as maybe_context:
287 lifespan_extra = {"broker": self.broker, **(maybe_context or {})}
289 if not self._lifespan_started: 289 ↛ 293line 289 didn't jump to line 293 because the condition on line 289 was always true
290 await self._start_broker()
291 self._lifespan_started = True
292 else:
293 warnings.warn(
294 "Specifying 'lifespan_context' manually is no longer necessary with FastAPI >= 0.112.2.",
295 category=RuntimeWarning,
296 stacklevel=2,
297 )
299 for h in self._after_startup_hooks:
300 lifespan_extra.update(await h(app) or {})
302 try:
303 if self.setup_state: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true
304 yield lifespan_extra
305 else:
306 # NOTE: old asgi compatibility
307 yield None
309 for h in self._on_shutdown_hooks:
310 await h(app)
312 finally:
313 await self.broker.stop()
315 return start_broker_lifespan # type: ignore[return-value]
317 @overload
318 def after_startup(
319 self,
320 func: Callable[["AppType"], Mapping[str, Any]],
321 ) -> Callable[["AppType"], Mapping[str, Any]]: ...
323 @overload
324 def after_startup(
325 self,
326 func: Callable[["AppType"], Awaitable[Mapping[str, Any]]],
327 ) -> Callable[["AppType"], Awaitable[Mapping[str, Any]]]: ...
329 @overload
330 def after_startup(
331 self,
332 func: Callable[["AppType"], None],
333 ) -> Callable[["AppType"], None]: ...
335 @overload
336 def after_startup(
337 self,
338 func: Callable[["AppType"], Awaitable[None]],
339 ) -> Callable[["AppType"], Awaitable[None]]: ...
341 def after_startup(
342 self,
343 func: Callable[["AppType"], Mapping[str, Any]]
344 | Callable[["AppType"], Awaitable[Mapping[str, Any]]]
345 | Callable[["AppType"], None]
346 | Callable[["AppType"], Awaitable[None]],
347 ) -> (
348 Callable[["AppType"], Mapping[str, Any]]
349 | Callable[["AppType"], Awaitable[Mapping[str, Any]]]
350 | Callable[["AppType"], None]
351 | Callable[["AppType"], Awaitable[None]]
352 ):
353 """Register a function to be executed after startup."""
354 self._after_startup_hooks.append(to_async(func))
355 return func
357 @overload
358 def on_broker_shutdown(
359 self,
360 func: Callable[["AppType"], None],
361 ) -> Callable[["AppType"], None]: ...
363 @overload
364 def on_broker_shutdown(
365 self,
366 func: Callable[["AppType"], Awaitable[None]],
367 ) -> Callable[["AppType"], Awaitable[None]]: ...
369 def on_broker_shutdown(
370 self,
371 func: Callable[["AppType"], None] | Callable[["AppType"], Awaitable[None]],
372 ) -> Callable[["AppType"], None] | Callable[["AppType"], Awaitable[None]]:
373 """Register a function to be executed before broker stop."""
374 self._on_shutdown_hooks.append(to_async(func))
375 return func
377 @abstractmethod
378 def publisher(self) -> "PublisherUsecase":
379 """Create Publisher object."""
380 raise NotImplementedError
382 def _asyncapi_router(self, schema_url: str | None) -> APIRouter | None:
383 """Creates an API router for serving AsyncAPI documentation."""
384 if not self.include_in_schema or not schema_url:
385 return None
387 def download_app_json_schema() -> Response:
388 return Response(
389 content=json.dumps(
390 self.schema.to_specification().to_jsonable(),
391 indent=2,
392 ),
393 headers={"Content-Type": "application/octet-stream"},
394 )
396 def download_app_yaml_schema() -> Response:
397 return Response(
398 content=self.schema.to_specification().to_yaml(),
399 headers={
400 "Content-Type": "application/octet-stream",
401 },
402 )
404 def serve_asyncapi_schema(
405 sidebar: bool = True,
406 info: bool = True,
407 servers: bool = True,
408 operations: bool = True,
409 messages: bool = True,
410 schemas: bool = True,
411 errors: bool = True,
412 expandMessageExamples: bool = True,
413 ) -> HTMLResponse:
414 """Serve the AsyncAPI schema as an HTML response."""
415 return HTMLResponse(
416 content=get_asyncapi_html(
417 self.schema.to_specification(),
418 sidebar=sidebar,
419 info=info,
420 servers=servers,
421 operations=operations,
422 messages=messages,
423 schemas=schemas,
424 errors=errors,
425 expand_message_examples=expandMessageExamples,
426 ),
427 )
429 docs_router = APIRouter(
430 prefix=self.prefix,
431 tags=["asyncapi"],
432 redirect_slashes=self.redirect_slashes,
433 default=self.default,
434 deprecated=self.deprecated,
435 )
436 docs_router.get(schema_url)(serve_asyncapi_schema)
437 docs_router.get(f"{schema_url}.json")(download_app_json_schema)
438 docs_router.get(f"{schema_url}.yaml")(download_app_yaml_schema)
439 return docs_router
441 def include_router( # type: ignore[override]
442 self,
443 router: Union["StreamRouter[MsgType]", "BrokerRouter[MsgType]"],
444 *,
445 prefix: str = "",
446 tags: list[str | Enum] | None = None,
447 dependencies: Sequence["params.Depends"] | None = None,
448 default_response_class: type[Response] = Default(JSONResponse),
449 responses: dict[int | str, dict[str, Any]] | None = None,
450 callbacks: list["BaseRoute"] | None = None,
451 deprecated: bool | None = None,
452 include_in_schema: bool = True,
453 generate_unique_id_function: Callable[["APIRoute"], str] = Default(
454 generate_unique_id,
455 ),
456 ) -> None:
457 """Includes a router in the API."""
458 if isinstance(router, BrokerRouter):
459 for sub in router.subscribers:
460 sub._call_decorators = (
461 self._subscriber_compatibility_wrapper(),
462 *sub._call_decorators,
463 )
465 self.broker.include_router(router)
466 return
468 msg = (
469 "Including a StreamRouter into another StreamRouter is not supported "
470 "and may cause subtle context issues (e.g. message dependencies "
471 "returning EmptyPlaceholder). "
472 "Use a regular broker router (e.g. KafkaRouter, RabbitRouter, etc.) "
473 "for grouping subscribers and include that into the StreamRouter instead. "
474 "See: https://faststream.ag2.ai/latest/getting-started/integrations/fastapi/#multiple-routers"
475 )
476 raise TypeError(msg)