Coverage for faststream / _internal / fastapi / route.py: 94%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2import inspect 

3from collections.abc import Awaitable, Callable, Iterable 

4from contextlib import AsyncExitStack 

5from functools import wraps 

6from itertools import dropwhile 

7from typing import ( 

8 TYPE_CHECKING, 

9 Any, 

10 Optional, 

11 Union, 

12) 

13 

14from fast_depends.dependencies import Dependant 

15from fastapi.routing import run_endpoint_function, serialize_response 

16from starlette.requests import Request 

17 

18from faststream._internal.context import Context, ContextRepo 

19from faststream._internal.types import P_HandlerParams, T_HandlerReturn 

20from faststream.exceptions import SetupError 

21from faststream.response import Response, ensure_response 

22 

23from ._compat import ( 

24 FASTAPI_V106, 

25 FASTAPI_V121, 

26 create_response_field, 

27 raise_fastapi_validation_error, 

28 solve_faststream_dependency, 

29) 

30from .get_dependant import ( 

31 get_fastapi_native_dependant, 

32 has_forbidden_types, 

33 is_faststream_decorated, 

34 mark_faststream_decorated, 

35) 

36 

37if TYPE_CHECKING: 

38 from fastapi import params 

39 from fastapi._compat import ModelField 

40 from fastapi.dependencies.models import Dependant as FastAPIDependant 

41 from fastapi.types import IncEx 

42 

43 from faststream.message import StreamMessage as NativeMessage 

44 

45 from .config import FastAPIConfig 

46 

47 

48class StreamMessage(Request): 

49 """A class to represent a stream message.""" 

50 

51 scope: "dict[str, Any]" 

52 _cookies: "dict[str, Any]" 

53 _headers: "dict[str, Any]" # type: ignore[assignment] 

54 _body: Union["dict[str, Any]", list[Any]] # type: ignore[assignment] 

55 _query_params: "dict[str, Any]" # type: ignore[assignment] 

56 

57 def __init__( 

58 self, 

59 *, 

60 body: Union["dict[str, Any]", list[Any]], 

61 headers: "dict[str, Any]", 

62 path: "dict[str, Any]", 

63 ) -> None: 

64 """Initialize a class instance.""" 

65 self._headers = headers 

66 self._body = body 

67 self._query_params = path 

68 

69 self.scope = {"path_params": self._query_params} 

70 self._cookies = {} 

71 

72 

73def wrap_callable_to_fastapi_compatible( 

74 user_callable: Callable[P_HandlerParams, T_HandlerReturn], 

75 *, 

76 fastapi_config: "FastAPIConfig", 

77 dependencies: Iterable["params.Depends"], 

78 response_model: Any, 

79 response_model_include: Optional["IncEx"], 

80 response_model_exclude: Optional["IncEx"], 

81 response_model_by_alias: bool, 

82 response_model_exclude_unset: bool, 

83 response_model_exclude_defaults: bool, 

84 response_model_exclude_none: bool, 

85 context: "ContextRepo", 

86) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]: 

87 if has_forbidden_types(user_callable, (Dependant,)): 

88 msg = ( 

89 f"Incorrect `faststream.Depends` usage at `{user_callable.__name__}`. " 

90 "For FastAPI integration use `fastapi.Depends` instead." 

91 ) 

92 raise SetupError(msg) 

93 

94 if has_forbidden_types(user_callable, (Context,)): 

95 msg = ( 

96 f"Incorrect `faststream.Context` usage at `{user_callable.__name__}`. " 

97 "For FastAPI integration use `faststream.[broker].fastapi.Context` instead." 

98 ) 

99 raise SetupError(msg) 

100 

101 if is_faststream_decorated(user_callable): 

102 return user_callable # type: ignore[return-value] 

103 

104 if response_model: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 response_field = create_response_field( 

106 name="ResponseModel", 

107 type_=response_model, 

108 mode="serialization", 

109 ) 

110 else: 

111 response_field = None 

112 

113 parsed_callable = build_faststream_to_fastapi_parser( 

114 dependent=get_fastapi_native_dependant( 

115 user_callable, 

116 dependencies=list(dependencies), 

117 ), 

118 fastapi_config=fastapi_config, 

119 response_field=response_field, 

120 response_model_include=response_model_include, 

121 response_model_exclude=response_model_exclude, 

122 response_model_by_alias=response_model_by_alias, 

123 response_model_exclude_unset=response_model_exclude_unset, 

124 response_model_exclude_defaults=response_model_exclude_defaults, 

125 response_model_exclude_none=response_model_exclude_none, 

126 context=context, 

127 ) 

128 

129 mark_faststream_decorated(parsed_callable) 

130 return wraps(user_callable)(parsed_callable) 

131 

132 

133def build_faststream_to_fastapi_parser( 

134 *, 

135 dependent: "FastAPIDependant", 

136 fastapi_config: "FastAPIConfig", 

137 context: "ContextRepo", 

138 response_field: Optional["ModelField"], 

139 response_model_include: Optional["IncEx"], 

140 response_model_exclude: Optional["IncEx"], 

141 response_model_by_alias: bool, 

142 response_model_exclude_unset: bool, 

143 response_model_exclude_defaults: bool, 

144 response_model_exclude_none: bool, 

145) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]: 

146 """Creates a session for handling requests.""" 

147 assert dependent.call 

148 

149 consume = make_fastapi_execution( 

150 dependent=dependent, 

151 fastapi_config=fastapi_config, 

152 response_field=response_field, 

153 response_model_include=response_model_include, 

154 response_model_exclude=response_model_exclude, 

155 response_model_by_alias=response_model_by_alias, 

156 response_model_exclude_unset=response_model_exclude_unset, 

157 response_model_exclude_defaults=response_model_exclude_defaults, 

158 response_model_exclude_none=response_model_exclude_none, 

159 ) 

160 

161 dependencies_names = tuple(i.name for i in dependent.dependencies) 

162 

163 first_arg = next( 

164 dropwhile( 

165 lambda i: i in dependencies_names, 

166 inspect.signature(dependent.call).parameters, 

167 ), 

168 None, 

169 ) 

170 

171 async def parsed_consumer(message: "NativeMessage[Any]") -> Any: 

172 """Wrapper, that parser FastStream message to FastAPI compatible one.""" 

173 body = await message.decode() 

174 

175 fastapi_body: dict[str, Any] | list[Any] 

176 if first_arg is not None: 

177 if isinstance(body, dict): 

178 path = fastapi_body = body or {} 

179 elif isinstance(body, list): 

180 fastapi_body, path = body, {} 

181 else: 

182 path = fastapi_body = {first_arg: body} 

183 

184 stream_message = StreamMessage( 

185 body=fastapi_body, 

186 headers={"context__": context, **message.headers}, 

187 path={**path, **message.path}, 

188 ) 

189 

190 else: 

191 stream_message = StreamMessage( 

192 body={}, 

193 headers={"context__": context}, 

194 path={}, 

195 ) 

196 

197 return await consume(stream_message, message) 

198 

199 return parsed_consumer 

200 

201 

202def make_fastapi_execution( 

203 *, 

204 dependent: "FastAPIDependant", 

205 fastapi_config: "FastAPIConfig", 

206 response_field: Optional["ModelField"], 

207 response_model_include: Optional["IncEx"], 

208 response_model_exclude: Optional["IncEx"], 

209 response_model_by_alias: bool, 

210 response_model_exclude_unset: bool, 

211 response_model_exclude_defaults: bool, 

212 response_model_exclude_none: bool, 

213) -> Callable[ 

214 ["StreamMessage", "NativeMessage[Any]"], 

215 Awaitable[Response], 

216]: 

217 """Creates a FastAPI application.""" 

218 is_coroutine = asyncio.iscoroutinefunction(dependent.call) 

219 

220 async def app( 

221 request: "StreamMessage", 

222 raw_message: "NativeMessage[Any]", # to support BackgroundTasks by middleware 

223 ) -> Response: 

224 """Consume StreamMessage and return user function result.""" 

225 async with AsyncExitStack() as stack: 

226 kwargs = {} 

227 if FASTAPI_V121: 227 ↛ 233line 227 didn't jump to line 233 because the condition on line 227 was always true

228 request.scope["fastapi_inner_astack"] = stack 

229 function_stack = AsyncExitStack() 

230 await stack.enter_async_context(function_stack) 

231 request.scope["fastapi_function_astack"] = function_stack 

232 

233 if FASTAPI_V106: 233 ↛ 237line 233 didn't jump to line 237 because the condition on line 233 was always true

234 kwargs = {"async_exit_stack": stack} 

235 

236 else: 

237 request.scope["fastapi_astack"] = stack 

238 

239 request.scope["app"] = fastapi_config.application 

240 

241 solved_result = await solve_faststream_dependency( 

242 request=request, 

243 dependant=dependent, 

244 dependency_overrides_provider=fastapi_config.dependency_overrides_provider, 

245 **kwargs, 

246 ) 

247 

248 raw_message.background = solved_result.background_tasks # type: ignore[attr-defined] 

249 

250 if solved_result.errors: 

251 raise_fastapi_validation_error(solved_result.errors, request._body) # type: ignore[arg-type] 

252 

253 function_result = await run_endpoint_function( 

254 dependant=dependent, 

255 values=solved_result.values, 

256 is_coroutine=is_coroutine, 

257 ) 

258 

259 response = ensure_response(function_result) 

260 

261 response.body = await serialize_response( 

262 response_content=response.body, 

263 field=response_field, 

264 include=response_model_include, 

265 exclude=response_model_exclude, 

266 by_alias=response_model_by_alias, 

267 exclude_unset=response_model_exclude_unset, 

268 exclude_defaults=response_model_exclude_defaults, 

269 exclude_none=response_model_exclude_none, 

270 is_coroutine=is_coroutine, 

271 ) 

272 

273 return response 

274 

275 msg = "unreachable" 

276 raise AssertionError(msg) 

277 

278 return app