Coverage for faststream / asgi / app.py: 78%
128 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import inspect
2import logging
3import traceback
4from abc import abstractmethod
5from collections.abc import AsyncIterator, Sequence
6from contextlib import asynccontextmanager
7from typing import TYPE_CHECKING, Any, Optional, Protocol
9import anyio
10from fast_depends import Provider, dependency_provider
12from faststream._internal._compat import HAS_TYPER, HAS_UVICORN, ExceptionGroup, uvicorn
13from faststream._internal.application import Application
14from faststream._internal.constants import EMPTY
15from faststream._internal.context import ContextRepo
16from faststream._internal.di import FastDependsConfig
17from faststream._internal.logger import logger
18from faststream.exceptions import INSTALL_UVICORN, StartupValidationError
20from .factories import AsyncAPIRoute, make_try_it_out_handler
21from .handlers import HttpHandler
22from .response import AsgiResponse
23from .websocket import WebSocketClose
25if TYPE_CHECKING:
26 from types import FrameType
28 from anyio.abc import TaskStatus
29 from fast_depends.library.serializer import SerializerProto
31 from faststream._internal.basic_types import (
32 AnyCallable,
33 Lifespan,
34 LoggerProto,
35 SettingField,
36 )
37 from faststream._internal.broker import BrokerUsecase
38 from faststream.specification.base import SpecificationFactory
40 class UvicornServerProtocol(Protocol):
41 should_exit: bool
42 force_exit: bool
44 def handle_exit(self, sig: int, frame: FrameType | None) -> None: ...
46 from .types import ASGIApp, Receive, Scope, Send
49class ServerState(Protocol):
50 extra_options: dict[str, "SettingField"]
52 @abstractmethod
53 def stop(self) -> None: ...
56class OuterRunState(ServerState):
57 def __init__(self) -> None:
58 self.extra_options = {}
60 def stop(self) -> None:
61 # TODO: resend signal to outer uvicorn
62 pass
65class CliRunState(ServerState):
66 def __init__(
67 self,
68 server: "UvicornServerProtocol",
69 extra_options: dict[str, "SettingField"],
70 ) -> None:
71 self.server = server
72 self.extra_options = extra_options
74 def stop(self) -> None:
75 self.server.should_exit = True
78def cast_uvicorn_params(params: dict[str, Any]) -> dict[str, Any]:
79 if port := params.get("port"):
80 params["port"] = int(port)
81 if fd := params.get("fd"):
82 params["fd"] = int(fd)
83 if (access_log := params.get("access_log", EMPTY)) is not EMPTY:
84 params["access_log"] = access_log.lower() not in {"false", ""}
85 return params
88class AsgiFastStream(Application):
89 _server: ServerState
91 def __init__(
92 self,
93 broker: Optional["BrokerUsecase[Any, Any]"] = None,
94 /,
95 asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (),
96 logger: Optional["LoggerProto"] = logger,
97 provider: Provider | None = None,
98 serializer: Optional["SerializerProto"] = EMPTY,
99 context: ContextRepo | None = None,
100 lifespan: Optional["Lifespan"] = None,
101 on_startup: Sequence["AnyCallable"] = (),
102 after_startup: Sequence["AnyCallable"] = (),
103 on_shutdown: Sequence["AnyCallable"] = (),
104 after_shutdown: Sequence["AnyCallable"] = (),
105 specification: Optional["SpecificationFactory"] = None,
106 asyncapi_path: str | AsyncAPIRoute | None = None,
107 ) -> None:
108 self.routes = list(asgi_routes)
110 super().__init__(
111 broker,
112 logger=logger,
113 config=FastDependsConfig(
114 provider=provider or dependency_provider,
115 context=context or ContextRepo(),
116 serializer=serializer,
117 ),
118 lifespan=lifespan,
119 on_startup=on_startup,
120 after_startup=after_startup,
121 on_shutdown=on_shutdown,
122 after_shutdown=after_shutdown,
123 specification=specification,
124 )
126 if asyncapi_path:
127 asyncapi_route = AsyncAPIRoute.ensure_route(asyncapi_path)
128 handler = asyncapi_route(self.schema)
129 handler.set_logger(logger)
130 self.routes.append((asyncapi_route.path, handler))
132 if asyncapi_route.try_it_out and self.broker is not None:
133 try_it_out_route = make_try_it_out_handler(
134 self.broker,
135 include_in_schema=asyncapi_route.include_in_schema,
136 )
138 try_it_out_route.update_fd_config(self.config)
139 try_it_out_route.set_logger(logger)
141 self.routes.append((
142 asyncapi_route.try_it_out_url,
143 try_it_out_route,
144 ))
146 self._server = OuterRunState()
148 self._log_level: int = logging.INFO
149 self._run_extra_options: dict[str, SettingField] = {}
151 def _init_setupable_( # noqa: PLW3201
152 self,
153 broker: Optional["BrokerUsecase[Any, Any]"] = None,
154 /,
155 specification: Optional["SpecificationFactory"] = None,
156 config: Optional["FastDependsConfig"] = None,
157 ) -> None:
158 super()._init_setupable_(broker, specification, config)
159 for route in self.routes:
160 self._register_route(route)
162 @classmethod
163 def from_app(
164 cls,
165 app: Application,
166 asgi_routes: Sequence[tuple[str, "ASGIApp"]],
167 asyncapi_path: str | AsyncAPIRoute | None = None,
168 ) -> "AsgiFastStream":
169 asgi_app = cls(
170 app.broker,
171 asgi_routes=asgi_routes,
172 asyncapi_path=asyncapi_path,
173 logger=app.logger,
174 lifespan=None,
175 )
176 asgi_app.lifespan_context = app.lifespan_context
177 asgi_app._on_startup_calling = app._on_startup_calling
178 asgi_app._after_startup_calling = app._after_startup_calling
179 asgi_app._on_shutdown_calling = app._on_shutdown_calling
180 asgi_app._after_shutdown_calling = app._after_shutdown_calling
181 return asgi_app
183 def mount(self, path: str, route: "ASGIApp") -> None:
184 asgi_route = (path, route)
185 self.routes.append(asgi_route)
186 self._register_route(asgi_route)
188 def _register_route(self, asgi_route: tuple[str, "ASGIApp"]) -> None:
189 path, route = asgi_route
190 if isinstance(route, HttpHandler):
191 self.schema.add_http_route(path, route)
192 route.update_fd_config(self.config)
193 route.set_logger(self.logger)
195 async def __call__(
196 self,
197 scope: "Scope",
198 receive: "Receive",
199 send: "Send",
200 ) -> None:
201 """ASGI implementation."""
202 if scope["type"] == "lifespan":
203 await self.lifespan(scope, receive, send)
204 return
206 if scope["type"] == "http":
207 for path, app in self.routes:
208 if scope["path"] == path:
209 await app(scope, receive, send)
210 return
212 await self.not_found(scope, receive, send)
213 return
215 async def run(
216 self,
217 log_level: int = logging.INFO,
218 run_extra_options: dict[str, "SettingField"] | None = None,
219 ) -> None:
220 if not HAS_UVICORN:
221 raise ImportError(INSTALL_UVICORN)
223 self._log_level = log_level
224 self._run_extra_options = cast_uvicorn_params(run_extra_options or {})
226 config = uvicorn.Config(
227 app=self,
228 log_level=self._log_level,
229 **{
230 key: v
231 for key, v in self._run_extra_options.items()
232 if key in set(inspect.signature(uvicorn.Config).parameters.keys())
233 },
234 )
236 server = uvicorn.Server(config)
237 await server.serve()
239 def exit(self) -> None:
240 """Manual stop method."""
241 self._server.stop()
243 @asynccontextmanager
244 async def start_lifespan_context(
245 self,
246 run_extra_options: dict[str, "SettingField"] | None = None,
247 ) -> AsyncIterator[None]:
248 run_extra_options = run_extra_options or self._run_extra_options
250 async with self.lifespan_context(**run_extra_options):
251 try:
252 async with anyio.create_task_group() as tg:
253 await tg.start(self.__start, logging.INFO, run_extra_options)
255 try:
256 yield
257 finally:
258 await self._shutdown()
259 tg.cancel_scope.cancel()
261 except ExceptionGroup as e:
262 for ex in e.exceptions:
263 raise ex from ex.__cause__
265 async def __start(
266 self,
267 log_level: int,
268 run_extra_options: dict[str, "SettingField"],
269 *,
270 task_status: "TaskStatus[None]" = anyio.TASK_STATUS_IGNORED,
271 ) -> None:
272 """Redefenition of `_startup` method.
274 Waits for hooks run before broker start.
275 """
276 async with (
277 self._startup_logging(log_level=log_level),
278 self._start_hooks_context(**run_extra_options),
279 ):
280 task_status.started()
281 await self._start_broker()
283 async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
284 """Handle ASGI lifespan messages to start and shutdown the app."""
285 started = False
286 await receive() # handle `lifespan.startup` event
288 async def process_exception(ex: BaseException) -> None:
289 exc_text = traceback.format_exc()
290 if started: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true
291 await send({"type": "lifespan.shutdown.failed", "message": exc_text})
292 else:
293 await send({"type": "lifespan.startup.failed", "message": exc_text})
294 raise ex
296 try:
297 async with self.start_lifespan_context():
298 await send({"type": "lifespan.startup.complete"})
299 started = True
300 await receive() # handle `lifespan.shutdown` event
302 except StartupValidationError as startup_exc:
303 # Process `on_startup` and `lifespan` missed extra options
304 if HAS_TYPER:
305 from faststream._internal.cli.utils.errors import draw_startup_errors
307 draw_startup_errors(startup_exc)
308 await send({"type": "lifespan.startup.failed", "message": ""})
310 else:
311 await process_exception(startup_exc)
313 except BaseException as base_exc:
314 await process_exception(base_exc)
316 else:
317 await send({"type": "lifespan.shutdown.complete"})
319 async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
320 not_found_msg = "Application doesn't support regular HTTP protocol."
322 if scope["type"] == "websocket":
323 websocket_close = WebSocketClose(
324 code=1000,
325 reason=not_found_msg,
326 )
327 await websocket_close(scope, receive, send)
328 return
330 response = AsgiResponse(
331 body=not_found_msg.encode(),
332 status_code=404,
333 )
335 await response(scope, receive, send)