Coverage for faststream / asgi / handlers.py: 95%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from collections.abc import Callable, Sequence 

3from typing import TYPE_CHECKING, Any, Optional, Union, overload 

4 

5from fast_depends.exceptions import ValidationError as FDValidationError 

6 

7from faststream import apply_types 

8from faststream._internal.di.config import FastDependsConfig 

9from faststream._internal.utils.functions import to_async 

10 

11from .request import AsgiRequest 

12from .response import AsgiResponse 

13 

14if TYPE_CHECKING: 

15 from faststream._internal.basic_types import LoggerProto 

16 from faststream.specification.schema import Tag, TagDict 

17 

18 from .types import ASGIApp, Receive, Scope, Send, UserApp 

19 

20 

21class HttpHandler: 

22 def __init__( 

23 self, 

24 func: "UserApp", 

25 *, 

26 include_in_schema: bool = True, 

27 description: str | None = None, 

28 methods: Sequence[str] | None = None, 

29 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

30 unique_id: str | None = None, 

31 fd_config: FastDependsConfig | None = None, 

32 ) -> None: 

33 self.__original_func = func 

34 self.func = func 

35 self.methods = methods or () 

36 self.include_in_schema = include_in_schema 

37 self.description = description or func.__doc__ 

38 self.tags = tags 

39 self.unique_id = unique_id 

40 self.fd_config = fd_config or FastDependsConfig() 

41 self.logger: LoggerProto | None = None 

42 

43 async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None: 

44 if scope["method"] not in self.methods: 

45 response: ASGIApp = _get_method_not_allowed_response(self.methods) 

46 

47 else: 

48 with ( 

49 self.fd_config.context.scope( 

50 "request", AsgiRequest(scope, receive, send) 

51 ), 

52 self.fd_config.context.scope( 

53 "logger", 

54 self.logger, 

55 ), 

56 ): 

57 try: 

58 response = await self.func(scope) 

59 except FDValidationError: 

60 if self.logger is not None: 

61 message = "Validation error" 

62 self.logger.log(logging.ERROR, message, exc_info=True) 

63 response = AsgiResponse( 

64 body=b"Validation error", 

65 status_code=422, 

66 ) 

67 except Exception: 

68 if self.logger is not None: 

69 self.logger.log( 

70 logging.ERROR, 

71 "Exception occurred while processing request", 

72 exc_info=True, 

73 ) 

74 response = AsgiResponse( 

75 body=b"Internal Server Error", status_code=500 

76 ) 

77 await response(scope, receive, send) 

78 

79 def update_fd_config(self, config: FastDependsConfig) -> None: 

80 self.fd_config = config | self.fd_config 

81 self.func = apply_types( 

82 to_async(self.__original_func), context__=self.fd_config.context 

83 ) 

84 

85 def set_logger(self, logger: "LoggerProto | None") -> None: 

86 self.logger = logger 

87 

88 

89class GetHandler(HttpHandler): 

90 def __init__( 

91 self, 

92 func: "UserApp", 

93 *, 

94 include_in_schema: bool = True, 

95 description: str | None = None, 

96 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

97 unique_id: str | None = None, 

98 ) -> None: 

99 super().__init__( 

100 func, 

101 include_in_schema=include_in_schema, 

102 description=description, 

103 methods=("GET", "HEAD"), 

104 tags=tags, 

105 unique_id=unique_id, 

106 ) 

107 

108 

109@overload 

110def get( 

111 func: "UserApp", 

112 *, 

113 include_in_schema: bool = True, 

114 description: str | None = None, 

115 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

116 unique_id: str | None = None, 

117) -> "GetHandler": ... 

118 

119 

120@overload 

121def get( 

122 func: None = None, 

123 *, 

124 include_in_schema: bool = True, 

125 description: str | None = None, 

126 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

127 unique_id: str | None = None, 

128) -> Callable[["UserApp"], "GetHandler"]: ... 

129 

130 

131def get( 

132 func: Optional["UserApp"] = None, 

133 *, 

134 include_in_schema: bool = True, 

135 description: str | None = None, 

136 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

137 unique_id: str | None = None, 

138) -> Union[Callable[["UserApp"], "GetHandler"], "GetHandler"]: 

139 def decorator(inner_func: "UserApp") -> "GetHandler": 

140 return GetHandler( 

141 inner_func, 

142 include_in_schema=include_in_schema, 

143 description=description, 

144 tags=tags, 

145 unique_id=unique_id, 

146 ) 

147 

148 if func is None: 

149 return decorator 

150 

151 return decorator(func) 

152 

153 

154class PostHandler(HttpHandler): 

155 def __init__( 

156 self, 

157 func: "UserApp", 

158 *, 

159 include_in_schema: bool = True, 

160 description: str | None = None, 

161 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

162 unique_id: str | None = None, 

163 ) -> None: 

164 super().__init__( 

165 func, 

166 include_in_schema=include_in_schema, 

167 description=description, 

168 methods=("POST", "HEAD"), 

169 tags=tags, 

170 unique_id=unique_id, 

171 ) 

172 

173 

174@overload 

175def post( 

176 func: "UserApp", 

177 *, 

178 include_in_schema: bool = True, 

179 description: str | None = None, 

180 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

181 unique_id: str | None = None, 

182) -> "PostHandler": ... 

183 

184 

185@overload 

186def post( 

187 func: None = None, 

188 *, 

189 include_in_schema: bool = True, 

190 description: str | None = None, 

191 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

192 unique_id: str | None = None, 

193) -> Callable[["UserApp"], "PostHandler"]: ... 

194 

195 

196def post( 

197 func: Optional["UserApp"] = None, 

198 *, 

199 include_in_schema: bool = True, 

200 description: str | None = None, 

201 tags: Sequence[Union["Tag", "TagDict", dict[str, Any]]] | None = None, 

202 unique_id: str | None = None, 

203) -> Union[Callable[["UserApp"], "PostHandler"], "PostHandler"]: 

204 def decorator(inner_func: "UserApp") -> "PostHandler": 

205 return PostHandler( 

206 inner_func, 

207 include_in_schema=include_in_schema, 

208 description=description, 

209 tags=tags, 

210 unique_id=unique_id, 

211 ) 

212 

213 if func is None: 

214 return decorator 

215 

216 return decorator(func) 

217 

218 

219def _get_method_not_allowed_response(methods: Sequence[str]) -> AsgiResponse: 

220 return AsgiResponse( 

221 body=b"Method Not Allowed", 

222 status_code=405, 

223 headers={ 

224 "Allow": ", ".join(methods), 

225 }, 

226 )