Coverage for faststream / asgi / app.py: 78%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import inspect 

2import logging 

3import traceback 

4from abc import abstractmethod 

5from collections.abc import AsyncIterator, Sequence 

6from contextlib import asynccontextmanager 

7from typing import TYPE_CHECKING, Any, Optional, Protocol 

8 

9import anyio 

10from fast_depends import Provider, dependency_provider 

11 

12from faststream._internal._compat import HAS_TYPER, HAS_UVICORN, ExceptionGroup, uvicorn 

13from faststream._internal.application import Application 

14from faststream._internal.constants import EMPTY 

15from faststream._internal.context import ContextRepo 

16from faststream._internal.di import FastDependsConfig 

17from faststream._internal.logger import logger 

18from faststream.exceptions import INSTALL_UVICORN, StartupValidationError 

19 

20from .factories import AsyncAPIRoute, make_try_it_out_handler 

21from .handlers import HttpHandler 

22from .response import AsgiResponse 

23from .websocket import WebSocketClose 

24 

25if TYPE_CHECKING: 

26 from types import FrameType 

27 

28 from anyio.abc import TaskStatus 

29 from fast_depends.library.serializer import SerializerProto 

30 

31 from faststream._internal.basic_types import ( 

32 AnyCallable, 

33 Lifespan, 

34 LoggerProto, 

35 SettingField, 

36 ) 

37 from faststream._internal.broker import BrokerUsecase 

38 from faststream.specification.base import SpecificationFactory 

39 

40 class UvicornServerProtocol(Protocol): 

41 should_exit: bool 

42 force_exit: bool 

43 

44 def handle_exit(self, sig: int, frame: FrameType | None) -> None: ... 

45 

46 from .types import ASGIApp, Receive, Scope, Send 

47 

48 

49class ServerState(Protocol): 

50 extra_options: dict[str, "SettingField"] 

51 

52 @abstractmethod 

53 def stop(self) -> None: ... 

54 

55 

56class OuterRunState(ServerState): 

57 def __init__(self) -> None: 

58 self.extra_options = {} 

59 

60 def stop(self) -> None: 

61 # TODO: resend signal to outer uvicorn 

62 pass 

63 

64 

65class CliRunState(ServerState): 

66 def __init__( 

67 self, 

68 server: "UvicornServerProtocol", 

69 extra_options: dict[str, "SettingField"], 

70 ) -> None: 

71 self.server = server 

72 self.extra_options = extra_options 

73 

74 def stop(self) -> None: 

75 self.server.should_exit = True 

76 

77 

78def cast_uvicorn_params(params: dict[str, Any]) -> dict[str, Any]: 

79 if port := params.get("port"): 

80 params["port"] = int(port) 

81 if fd := params.get("fd"): 

82 params["fd"] = int(fd) 

83 if (access_log := params.get("access_log", EMPTY)) is not EMPTY: 

84 params["access_log"] = access_log.lower() not in {"false", ""} 

85 return params 

86 

87 

88class AsgiFastStream(Application): 

89 _server: ServerState 

90 

91 def __init__( 

92 self, 

93 broker: Optional["BrokerUsecase[Any, Any]"] = None, 

94 /, 

95 asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (), 

96 logger: Optional["LoggerProto"] = logger, 

97 provider: Provider | None = None, 

98 serializer: Optional["SerializerProto"] = EMPTY, 

99 context: ContextRepo | None = None, 

100 lifespan: Optional["Lifespan"] = None, 

101 on_startup: Sequence["AnyCallable"] = (), 

102 after_startup: Sequence["AnyCallable"] = (), 

103 on_shutdown: Sequence["AnyCallable"] = (), 

104 after_shutdown: Sequence["AnyCallable"] = (), 

105 specification: Optional["SpecificationFactory"] = None, 

106 asyncapi_path: str | AsyncAPIRoute | None = None, 

107 ) -> None: 

108 self.routes = list(asgi_routes) 

109 

110 super().__init__( 

111 broker, 

112 logger=logger, 

113 config=FastDependsConfig( 

114 provider=provider or dependency_provider, 

115 context=context or ContextRepo(), 

116 serializer=serializer, 

117 ), 

118 lifespan=lifespan, 

119 on_startup=on_startup, 

120 after_startup=after_startup, 

121 on_shutdown=on_shutdown, 

122 after_shutdown=after_shutdown, 

123 specification=specification, 

124 ) 

125 

126 if asyncapi_path: 

127 asyncapi_route = AsyncAPIRoute.ensure_route(asyncapi_path) 

128 handler = asyncapi_route(self.schema) 

129 handler.set_logger(logger) 

130 self.routes.append((asyncapi_route.path, handler)) 

131 

132 if asyncapi_route.try_it_out and self.broker is not None: 

133 try_it_out_route = make_try_it_out_handler( 

134 self.broker, 

135 include_in_schema=asyncapi_route.include_in_schema, 

136 ) 

137 

138 try_it_out_route.update_fd_config(self.config) 

139 try_it_out_route.set_logger(logger) 

140 

141 self.routes.append(( 

142 asyncapi_route.try_it_out_url, 

143 try_it_out_route, 

144 )) 

145 

146 self._server = OuterRunState() 

147 

148 self._log_level: int = logging.INFO 

149 self._run_extra_options: dict[str, SettingField] = {} 

150 

151 def _init_setupable_( # noqa: PLW3201 

152 self, 

153 broker: Optional["BrokerUsecase[Any, Any]"] = None, 

154 /, 

155 specification: Optional["SpecificationFactory"] = None, 

156 config: Optional["FastDependsConfig"] = None, 

157 ) -> None: 

158 super()._init_setupable_(broker, specification, config) 

159 for route in self.routes: 

160 self._register_route(route) 

161 

162 @classmethod 

163 def from_app( 

164 cls, 

165 app: Application, 

166 asgi_routes: Sequence[tuple[str, "ASGIApp"]], 

167 asyncapi_path: str | AsyncAPIRoute | None = None, 

168 ) -> "AsgiFastStream": 

169 asgi_app = cls( 

170 app.broker, 

171 asgi_routes=asgi_routes, 

172 asyncapi_path=asyncapi_path, 

173 logger=app.logger, 

174 lifespan=None, 

175 ) 

176 asgi_app.lifespan_context = app.lifespan_context 

177 asgi_app._on_startup_calling = app._on_startup_calling 

178 asgi_app._after_startup_calling = app._after_startup_calling 

179 asgi_app._on_shutdown_calling = app._on_shutdown_calling 

180 asgi_app._after_shutdown_calling = app._after_shutdown_calling 

181 return asgi_app 

182 

183 def mount(self, path: str, route: "ASGIApp") -> None: 

184 asgi_route = (path, route) 

185 self.routes.append(asgi_route) 

186 self._register_route(asgi_route) 

187 

188 def _register_route(self, asgi_route: tuple[str, "ASGIApp"]) -> None: 

189 path, route = asgi_route 

190 if isinstance(route, HttpHandler): 

191 self.schema.add_http_route(path, route) 

192 route.update_fd_config(self.config) 

193 route.set_logger(self.logger) 

194 

195 async def __call__( 

196 self, 

197 scope: "Scope", 

198 receive: "Receive", 

199 send: "Send", 

200 ) -> None: 

201 """ASGI implementation.""" 

202 if scope["type"] == "lifespan": 

203 await self.lifespan(scope, receive, send) 

204 return 

205 

206 if scope["type"] == "http": 

207 for path, app in self.routes: 

208 if scope["path"] == path: 

209 await app(scope, receive, send) 

210 return 

211 

212 await self.not_found(scope, receive, send) 

213 return 

214 

215 async def run( 

216 self, 

217 log_level: int = logging.INFO, 

218 run_extra_options: dict[str, "SettingField"] | None = None, 

219 ) -> None: 

220 if not HAS_UVICORN: 

221 raise ImportError(INSTALL_UVICORN) 

222 

223 self._log_level = log_level 

224 self._run_extra_options = cast_uvicorn_params(run_extra_options or {}) 

225 

226 config = uvicorn.Config( 

227 app=self, 

228 log_level=self._log_level, 

229 **{ 

230 key: v 

231 for key, v in self._run_extra_options.items() 

232 if key in set(inspect.signature(uvicorn.Config).parameters.keys()) 

233 }, 

234 ) 

235 

236 server = uvicorn.Server(config) 

237 await server.serve() 

238 

239 def exit(self) -> None: 

240 """Manual stop method.""" 

241 self._server.stop() 

242 

243 @asynccontextmanager 

244 async def start_lifespan_context( 

245 self, 

246 run_extra_options: dict[str, "SettingField"] | None = None, 

247 ) -> AsyncIterator[None]: 

248 run_extra_options = run_extra_options or self._run_extra_options 

249 

250 async with self.lifespan_context(**run_extra_options): 

251 try: 

252 async with anyio.create_task_group() as tg: 

253 await tg.start(self.__start, logging.INFO, run_extra_options) 

254 

255 try: 

256 yield 

257 finally: 

258 await self._shutdown() 

259 tg.cancel_scope.cancel() 

260 

261 except ExceptionGroup as e: 

262 for ex in e.exceptions: 

263 raise ex from ex.__cause__ 

264 

265 async def __start( 

266 self, 

267 log_level: int, 

268 run_extra_options: dict[str, "SettingField"], 

269 *, 

270 task_status: "TaskStatus[None]" = anyio.TASK_STATUS_IGNORED, 

271 ) -> None: 

272 """Redefenition of `_startup` method. 

273 

274 Waits for hooks run before broker start. 

275 """ 

276 async with ( 

277 self._startup_logging(log_level=log_level), 

278 self._start_hooks_context(**run_extra_options), 

279 ): 

280 task_status.started() 

281 await self._start_broker() 

282 

283 async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None: 

284 """Handle ASGI lifespan messages to start and shutdown the app.""" 

285 started = False 

286 await receive() # handle `lifespan.startup` event 

287 

288 async def process_exception(ex: BaseException) -> None: 

289 exc_text = traceback.format_exc() 

290 if started: 290 ↛ 291line 290 didn't jump to line 291 because the condition on line 290 was never true

291 await send({"type": "lifespan.shutdown.failed", "message": exc_text}) 

292 else: 

293 await send({"type": "lifespan.startup.failed", "message": exc_text}) 

294 raise ex 

295 

296 try: 

297 async with self.start_lifespan_context(): 

298 await send({"type": "lifespan.startup.complete"}) 

299 started = True 

300 await receive() # handle `lifespan.shutdown` event 

301 

302 except StartupValidationError as startup_exc: 

303 # Process `on_startup` and `lifespan` missed extra options 

304 if HAS_TYPER: 

305 from faststream._internal.cli.utils.errors import draw_startup_errors 

306 

307 draw_startup_errors(startup_exc) 

308 await send({"type": "lifespan.startup.failed", "message": ""}) 

309 

310 else: 

311 await process_exception(startup_exc) 

312 

313 except BaseException as base_exc: 

314 await process_exception(base_exc) 

315 

316 else: 

317 await send({"type": "lifespan.shutdown.complete"}) 

318 

319 async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None: 

320 not_found_msg = "Application doesn't support regular HTTP protocol." 

321 

322 if scope["type"] == "websocket": 

323 websocket_close = WebSocketClose( 

324 code=1000, 

325 reason=not_found_msg, 

326 ) 

327 await websocket_close(scope, receive, send) 

328 return 

329 

330 response = AsgiResponse( 

331 body=not_found_msg.encode(), 

332 status_code=404, 

333 ) 

334 

335 await response(scope, receive, send)