Coverage for faststream / _internal / fastapi / router.py: 96%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import json 

2import warnings 

3from abc import abstractmethod 

4from collections.abc import ( 

5 AsyncIterator, 

6 Awaitable, 

7 Callable, 

8 Iterable, 

9 Mapping, 

10 Sequence, 

11) 

12from contextlib import asynccontextmanager 

13from enum import Enum 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Generic, 

18 Optional, 

19 Union, 

20 cast, 

21 overload, 

22) 

23 

24from fastapi.datastructures import Default 

25from fastapi.responses import HTMLResponse 

26from fastapi.routing import APIRoute, APIRouter 

27from fastapi.utils import generate_unique_id 

28from starlette.responses import JSONResponse, Response 

29from starlette.routing import BaseRoute, _DefaultLifespan 

30 

31from faststream._internal.application import StartAbleApplication 

32from faststream._internal.broker import BrokerRouter 

33from faststream._internal.context import ContextRepo 

34from faststream._internal.di.config import FastDependsConfig 

35from faststream._internal.types import ( 

36 MsgType, 

37 P_HandlerParams, 

38 T_HandlerReturn, 

39) 

40from faststream._internal.utils.functions import to_async 

41from faststream.middlewares import BaseMiddleware 

42from faststream.specification.asyncapi.site import get_asyncapi_html 

43 

44from .config import FastAPIConfig 

45from .get_dependant import get_fastapi_dependant 

46from .route import wrap_callable_to_fastapi_compatible 

47 

48if TYPE_CHECKING: 

49 from types import TracebackType 

50 

51 from fastapi import FastAPI, params 

52 from fastapi.background import BackgroundTasks 

53 from fastapi.types import IncEx 

54 from starlette import routing 

55 from starlette.types import ASGIApp, AppType, Lifespan 

56 

57 from faststream._internal.broker import BrokerUsecase 

58 from faststream._internal.endpoint.call_wrapper import HandlerCallWrapper 

59 from faststream._internal.endpoint.publisher import PublisherUsecase 

60 from faststream._internal.proto import NameRequired 

61 from faststream._internal.types import BrokerMiddleware 

62 from faststream.message import StreamMessage 

63 from faststream.specification.base import SpecificationFactory 

64 from faststream.specification.schema import Tag, TagDict 

65 

66 

67class _BackgroundMiddleware(BaseMiddleware): 

68 async def __aexit__( 

69 self, 

70 exc_type: type[BaseException] | None = None, 

71 exc_val: BaseException | None = None, 

72 exc_tb: Optional["TracebackType"] = None, 

73 ) -> bool | None: 

74 if not exc_type and ( 

75 background := cast( 

76 "BackgroundTasks | None", 

77 getattr(self.context.get_local("message"), "background", None), 

78 ) 

79 ): 

80 await background() 

81 

82 return await super().after_processed(exc_type, exc_val, exc_tb) 

83 

84 

85class StreamRouter(APIRouter, StartAbleApplication, Generic[MsgType]): 

86 """A class to route streams.""" 

87 

88 broker_class: type["BrokerUsecase[MsgType, Any]"] 

89 broker: "BrokerUsecase[MsgType, Any]" 

90 docs_router: APIRouter | None 

91 _after_startup_hooks: list[Callable[[Any], Awaitable[Mapping[str, Any] | None]]] 

92 _on_shutdown_hooks: list[Callable[[Any], Awaitable[None]]] 

93 

94 title: str 

95 description: str 

96 version: str 

97 license: dict[str, Any] | None 

98 contact: dict[str, Any] | None 

99 

100 def __init__( 

101 self, 

102 *connection_args: Any, 

103 context: ContextRepo | None, 

104 middlewares: Sequence["BrokerMiddleware[MsgType]"] = (), 

105 prefix: str = "", 

106 tags: list[str | Enum] | None = None, 

107 dependencies: Sequence["params.Depends"] | None = None, 

108 default_response_class: type["Response"] = Default(JSONResponse), 

109 responses: dict[int | str, dict[str, Any]] | None = None, 

110 callbacks: list["routing.BaseRoute"] | None = None, 

111 routes: list["routing.BaseRoute"] | None = None, 

112 redirect_slashes: bool = True, 

113 default: Optional["ASGIApp"] = None, 

114 dependency_overrides_provider: Any | None = None, 

115 route_class: type["APIRoute"] = APIRoute, 

116 on_startup: Sequence[Callable[[], Any]] | None = None, 

117 on_shutdown: Sequence[Callable[[], Any]] | None = None, 

118 deprecated: bool | None = None, 

119 include_in_schema: bool = True, 

120 setup_state: bool = True, 

121 lifespan: Optional["Lifespan[Any]"] = None, 

122 generate_unique_id_function: Callable[["APIRoute"], str] = Default( 

123 generate_unique_id, 

124 ), 

125 # Specification information 

126 specification: Optional["SpecificationFactory"] = None, 

127 specification_tags: Iterable[Union["Tag", "TagDict"]] = (), 

128 schema_url: str | None = "/asyncapi", 

129 **connection_kwars: Any, 

130 ) -> None: 

131 broker = self.broker_class( 

132 *connection_args, 

133 middlewares=( 

134 *middlewares, 

135 # allow to catch background exceptions in user middlewares 

136 _BackgroundMiddleware, 

137 ), 

138 tags=specification_tags, 

139 apply_types=False, 

140 **connection_kwars, 

141 ) 

142 

143 self._init_setupable_( 

144 broker, 

145 config=FastDependsConfig( 

146 get_dependent=get_fastapi_dependant, 

147 context=context or ContextRepo(), 

148 ), 

149 specification=specification, 

150 ) 

151 

152 self.setup_state = setup_state 

153 

154 super().__init__( 

155 prefix=prefix, 

156 tags=tags, 

157 dependencies=dependencies, 

158 default_response_class=default_response_class, 

159 responses=responses, 

160 callbacks=callbacks, 

161 routes=routes, 

162 redirect_slashes=redirect_slashes, 

163 default=default, 

164 dependency_overrides_provider=dependency_overrides_provider, 

165 route_class=route_class, 

166 deprecated=deprecated, 

167 include_in_schema=include_in_schema, 

168 generate_unique_id_function=generate_unique_id_function, 

169 lifespan=self._wrap_lifespan(lifespan), 

170 on_startup=on_startup, 

171 on_shutdown=on_shutdown, 

172 ) 

173 

174 self.fastapi_config = FastAPIConfig( 

175 dependency_overrides_provider=dependency_overrides_provider, 

176 ) 

177 

178 if self.include_in_schema: 

179 self.docs_router = self._asyncapi_router(schema_url) 

180 else: 

181 self.docs_router = None 

182 

183 self._after_startup_hooks = [] 

184 self._on_shutdown_hooks = [] 

185 

186 self._lifespan_started = False 

187 

188 def _subscriber_compatibility_wrapper( 

189 self, 

190 dependencies: Iterable["params.Depends"] = (), 

191 response_model: Any = Default(None), 

192 response_model_include: Optional["IncEx"] = None, 

193 response_model_exclude: Optional["IncEx"] = None, 

194 response_model_by_alias: bool = True, 

195 response_model_exclude_unset: bool = False, 

196 response_model_exclude_defaults: bool = False, 

197 response_model_exclude_none: bool = False, 

198 ) -> Callable[ 

199 [Callable[..., Any]], 

200 Callable[["StreamMessage[Any]"], Awaitable[Any]], 

201 ]: 

202 """Decorator before `broker.subscriber`, that wraps function to FastAPI-compatible one.""" 

203 

204 def wrapper( 

205 endpoint: Callable[..., Any], 

206 ) -> Callable[["StreamMessage[Any]"], Awaitable[Any]]: 

207 """Patch user function to make it FastAPI-compatible.""" 

208 return wrap_callable_to_fastapi_compatible( 

209 user_callable=endpoint, 

210 dependencies=dependencies, 

211 response_model=response_model, 

212 response_model_include=response_model_include, 

213 response_model_exclude=response_model_exclude, 

214 response_model_by_alias=response_model_by_alias, 

215 response_model_exclude_unset=response_model_exclude_unset, 

216 response_model_exclude_defaults=response_model_exclude_defaults, 

217 response_model_exclude_none=response_model_exclude_none, 

218 context=self.context, 

219 fastapi_config=self.fastapi_config, 

220 ) 

221 

222 return wrapper 

223 

224 def subscriber( 

225 self, 

226 *extra: Union["NameRequired", str], 

227 dependencies: Iterable["params.Depends"], 

228 response_model: Any, 

229 response_model_include: Optional["IncEx"], 

230 response_model_exclude: Optional["IncEx"], 

231 response_model_by_alias: bool, 

232 response_model_exclude_unset: bool, 

233 response_model_exclude_defaults: bool, 

234 response_model_exclude_none: bool, 

235 **broker_kwargs: Any, 

236 ) -> Callable[ 

237 [Callable[P_HandlerParams, T_HandlerReturn]], 

238 "HandlerCallWrapper[P_HandlerParams, T_HandlerReturn]", 

239 ]: 

240 """A function decorator for subscribing to a message queue.""" 

241 dependencies = (*self.dependencies, *dependencies) 

242 

243 sub = self.broker.subscriber( # type: ignore[call-arg] 

244 *extra, # type: ignore[arg-type] 

245 dependencies=dependencies, 

246 **broker_kwargs, 

247 ) 

248 

249 sub._call_decorators = ( 

250 self._subscriber_compatibility_wrapper( 

251 dependencies=dependencies, 

252 response_model=response_model, 

253 response_model_include=response_model_include, 

254 response_model_exclude=response_model_exclude, 

255 response_model_by_alias=response_model_by_alias, 

256 response_model_exclude_unset=response_model_exclude_unset, 

257 response_model_exclude_defaults=response_model_exclude_defaults, 

258 response_model_exclude_none=response_model_exclude_none, 

259 ), 

260 *sub._call_decorators, 

261 ) 

262 

263 return sub 

264 

265 def _wrap_lifespan( 

266 self, 

267 lifespan: Optional["Lifespan[Any]"] = None, 

268 ) -> "Lifespan[Any]": 

269 lifespan_context = lifespan if lifespan is not None else _DefaultLifespan(self) 

270 

271 @asynccontextmanager 

272 async def start_broker_lifespan( 

273 app: "FastAPI", 

274 ) -> AsyncIterator[Mapping[str, Any] | None]: 

275 """Starts the lifespan of a broker.""" 

276 self.fastapi_config.set_application(app) 

277 

278 if self.docs_router: 

279 self.schema.title = app.title 

280 self.schema.description = app.description 

281 self.schema.version = app.version 

282 self.schema.contact = app.contact 

283 self.schema.license = app.license_info 

284 app.include_router(self.docs_router) 

285 

286 async with lifespan_context(app) as maybe_context: 

287 lifespan_extra = {"broker": self.broker, **(maybe_context or {})} 

288 

289 if not self._lifespan_started: 289 ↛ 293line 289 didn't jump to line 293 because the condition on line 289 was always true

290 await self._start_broker() 

291 self._lifespan_started = True 

292 else: 

293 warnings.warn( 

294 "Specifying 'lifespan_context' manually is no longer necessary with FastAPI >= 0.112.2.", 

295 category=RuntimeWarning, 

296 stacklevel=2, 

297 ) 

298 

299 for h in self._after_startup_hooks: 

300 lifespan_extra.update(await h(app) or {}) 

301 

302 try: 

303 if self.setup_state: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true

304 yield lifespan_extra 

305 else: 

306 # NOTE: old asgi compatibility 

307 yield None 

308 

309 for h in self._on_shutdown_hooks: 

310 await h(app) 

311 

312 finally: 

313 await self.broker.stop() 

314 

315 return start_broker_lifespan # type: ignore[return-value] 

316 

317 @overload 

318 def after_startup( 

319 self, 

320 func: Callable[["AppType"], Mapping[str, Any]], 

321 ) -> Callable[["AppType"], Mapping[str, Any]]: ... 

322 

323 @overload 

324 def after_startup( 

325 self, 

326 func: Callable[["AppType"], Awaitable[Mapping[str, Any]]], 

327 ) -> Callable[["AppType"], Awaitable[Mapping[str, Any]]]: ... 

328 

329 @overload 

330 def after_startup( 

331 self, 

332 func: Callable[["AppType"], None], 

333 ) -> Callable[["AppType"], None]: ... 

334 

335 @overload 

336 def after_startup( 

337 self, 

338 func: Callable[["AppType"], Awaitable[None]], 

339 ) -> Callable[["AppType"], Awaitable[None]]: ... 

340 

341 def after_startup( 

342 self, 

343 func: Callable[["AppType"], Mapping[str, Any]] 

344 | Callable[["AppType"], Awaitable[Mapping[str, Any]]] 

345 | Callable[["AppType"], None] 

346 | Callable[["AppType"], Awaitable[None]], 

347 ) -> ( 

348 Callable[["AppType"], Mapping[str, Any]] 

349 | Callable[["AppType"], Awaitable[Mapping[str, Any]]] 

350 | Callable[["AppType"], None] 

351 | Callable[["AppType"], Awaitable[None]] 

352 ): 

353 """Register a function to be executed after startup.""" 

354 self._after_startup_hooks.append(to_async(func)) 

355 return func 

356 

357 @overload 

358 def on_broker_shutdown( 

359 self, 

360 func: Callable[["AppType"], None], 

361 ) -> Callable[["AppType"], None]: ... 

362 

363 @overload 

364 def on_broker_shutdown( 

365 self, 

366 func: Callable[["AppType"], Awaitable[None]], 

367 ) -> Callable[["AppType"], Awaitable[None]]: ... 

368 

369 def on_broker_shutdown( 

370 self, 

371 func: Callable[["AppType"], None] | Callable[["AppType"], Awaitable[None]], 

372 ) -> Callable[["AppType"], None] | Callable[["AppType"], Awaitable[None]]: 

373 """Register a function to be executed before broker stop.""" 

374 self._on_shutdown_hooks.append(to_async(func)) 

375 return func 

376 

377 @abstractmethod 

378 def publisher(self) -> "PublisherUsecase": 

379 """Create Publisher object.""" 

380 raise NotImplementedError 

381 

382 def _asyncapi_router(self, schema_url: str | None) -> APIRouter | None: 

383 """Creates an API router for serving AsyncAPI documentation.""" 

384 if not self.include_in_schema or not schema_url: 

385 return None 

386 

387 def download_app_json_schema() -> Response: 

388 return Response( 

389 content=json.dumps( 

390 self.schema.to_specification().to_jsonable(), 

391 indent=2, 

392 ), 

393 headers={"Content-Type": "application/octet-stream"}, 

394 ) 

395 

396 def download_app_yaml_schema() -> Response: 

397 return Response( 

398 content=self.schema.to_specification().to_yaml(), 

399 headers={ 

400 "Content-Type": "application/octet-stream", 

401 }, 

402 ) 

403 

404 def serve_asyncapi_schema( 

405 sidebar: bool = True, 

406 info: bool = True, 

407 servers: bool = True, 

408 operations: bool = True, 

409 messages: bool = True, 

410 schemas: bool = True, 

411 errors: bool = True, 

412 expandMessageExamples: bool = True, 

413 ) -> HTMLResponse: 

414 """Serve the AsyncAPI schema as an HTML response.""" 

415 return HTMLResponse( 

416 content=get_asyncapi_html( 

417 self.schema.to_specification(), 

418 sidebar=sidebar, 

419 info=info, 

420 servers=servers, 

421 operations=operations, 

422 messages=messages, 

423 schemas=schemas, 

424 errors=errors, 

425 expand_message_examples=expandMessageExamples, 

426 ), 

427 ) 

428 

429 docs_router = APIRouter( 

430 prefix=self.prefix, 

431 tags=["asyncapi"], 

432 redirect_slashes=self.redirect_slashes, 

433 default=self.default, 

434 deprecated=self.deprecated, 

435 ) 

436 docs_router.get(schema_url)(serve_asyncapi_schema) 

437 docs_router.get(f"{schema_url}.json")(download_app_json_schema) 

438 docs_router.get(f"{schema_url}.yaml")(download_app_yaml_schema) 

439 return docs_router 

440 

441 def include_router( # type: ignore[override] 

442 self, 

443 router: Union["StreamRouter[MsgType]", "BrokerRouter[MsgType]"], 

444 *, 

445 prefix: str = "", 

446 tags: list[str | Enum] | None = None, 

447 dependencies: Sequence["params.Depends"] | None = None, 

448 default_response_class: type[Response] = Default(JSONResponse), 

449 responses: dict[int | str, dict[str, Any]] | None = None, 

450 callbacks: list["BaseRoute"] | None = None, 

451 deprecated: bool | None = None, 

452 include_in_schema: bool = True, 

453 generate_unique_id_function: Callable[["APIRoute"], str] = Default( 

454 generate_unique_id, 

455 ), 

456 ) -> None: 

457 """Includes a router in the API.""" 

458 if isinstance(router, BrokerRouter): 

459 for sub in router.subscribers: 

460 sub._call_decorators = ( 

461 self._subscriber_compatibility_wrapper(), 

462 *sub._call_decorators, 

463 ) 

464 

465 self.broker.include_router(router) 

466 return 

467 

468 msg = ( 

469 "Including a StreamRouter into another StreamRouter is not supported " 

470 "and may cause subtle context issues (e.g. message dependencies " 

471 "returning EmptyPlaceholder). " 

472 "Use a regular broker router (e.g. KafkaRouter, RabbitRouter, etc.) " 

473 "for grouping subscribers and include that into the StreamRouter instead. " 

474 "See: https://faststream.ag2.ai/latest/getting-started/integrations/fastapi/#multiple-routers" 

475 ) 

476 raise TypeError(msg)