Coverage for fastapi/encoders.py: 100%

98 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-08 03:53 +0000

1import dataclasses 1abcde

2import datetime 1abcde

3from collections import defaultdict, deque 1abcde

4from decimal import Decimal 1abcde

5from enum import Enum 1abcde

6from ipaddress import ( 1abcde

7 IPv4Address, 

8 IPv4Interface, 

9 IPv4Network, 

10 IPv6Address, 

11 IPv6Interface, 

12 IPv6Network, 

13) 

14from pathlib import Path, PurePath 1abcde

15from re import Pattern 1abcde

16from types import GeneratorType 1abcde

17from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union 1abcde

18from uuid import UUID 1abcde

19 

20from fastapi.types import IncEx 1abcde

21from pydantic import BaseModel 1abcde

22from pydantic.color import Color 1abcde

23from pydantic.networks import AnyUrl, NameEmail 1abcde

24from pydantic.types import SecretBytes, SecretStr 1abcde

25from typing_extensions import Annotated, Doc 1abcde

26 

27from ._compat import PYDANTIC_V2, UndefinedType, Url, _model_dump 1abcde

28 

29 

30# Taken from Pydantic v1 as is 

31def isoformat(o: Union[datetime.date, datetime.time]) -> str: 1abcde

32 return o.isoformat() 1abcde

33 

34 

35# Taken from Pydantic v1 as is 

36# TODO: pv2 should this return strings instead? 

37def decimal_encoder(dec_value: Decimal) -> Union[int, float]: 1abcde

38 """ 

39 Encodes a Decimal as int of there's no exponent, otherwise float 

40 

41 This is useful when we use ConstrainedDecimal to represent Numeric(x,0) 

42 where a integer (but not int typed) is used. Encoding this as a float 

43 results in failed round-tripping between encode and parse. 

44 Our Id type is a prime example of this. 

45 

46 >>> decimal_encoder(Decimal("1.0")) 

47 1.0 

48 

49 >>> decimal_encoder(Decimal("1")) 

50 1 

51 """ 

52 if dec_value.as_tuple().exponent >= 0: # type: ignore[operator] 1abcde

53 return int(dec_value) 1abcde

54 else: 

55 return float(dec_value) 1abcde

56 

57 

58ENCODERS_BY_TYPE: Dict[Type[Any], Callable[[Any], Any]] = { 1abcde

59 bytes: lambda o: o.decode(), 

60 Color: str, 

61 datetime.date: isoformat, 

62 datetime.datetime: isoformat, 

63 datetime.time: isoformat, 

64 datetime.timedelta: lambda td: td.total_seconds(), 

65 Decimal: decimal_encoder, 

66 Enum: lambda o: o.value, 

67 frozenset: list, 

68 deque: list, 

69 GeneratorType: list, 

70 IPv4Address: str, 

71 IPv4Interface: str, 

72 IPv4Network: str, 

73 IPv6Address: str, 

74 IPv6Interface: str, 

75 IPv6Network: str, 

76 NameEmail: str, 

77 Path: str, 

78 Pattern: lambda o: o.pattern, 

79 SecretBytes: str, 

80 SecretStr: str, 

81 set: list, 

82 UUID: str, 

83 Url: str, 

84 AnyUrl: str, 

85} 

86 

87 

88def generate_encoders_by_class_tuples( 1abcde

89 type_encoder_map: Dict[Any, Callable[[Any], Any]], 

90) -> Dict[Callable[[Any], Any], Tuple[Any, ...]]: 

91 encoders_by_class_tuples: Dict[Callable[[Any], Any], Tuple[Any, ...]] = defaultdict( 1abcde

92 tuple 

93 ) 

94 for type_, encoder in type_encoder_map.items(): 1abcde

95 encoders_by_class_tuples[encoder] += (type_,) 1abcde

96 return encoders_by_class_tuples 1abcde

97 

98 

99encoders_by_class_tuples = generate_encoders_by_class_tuples(ENCODERS_BY_TYPE) 1abcde

100 

101 

102def jsonable_encoder( 1abcde

103 obj: Annotated[ 

104 Any, 

105 Doc( 

106 """ 

107 The input object to convert to JSON. 

108 """ 

109 ), 

110 ], 

111 include: Annotated[ 

112 Optional[IncEx], 

113 Doc( 

114 """ 

115 Pydantic's `include` parameter, passed to Pydantic models to set the 

116 fields to include. 

117 """ 

118 ), 

119 ] = None, 

120 exclude: Annotated[ 

121 Optional[IncEx], 

122 Doc( 

123 """ 

124 Pydantic's `exclude` parameter, passed to Pydantic models to set the 

125 fields to exclude. 

126 """ 

127 ), 

128 ] = None, 

129 by_alias: Annotated[ 

130 bool, 

131 Doc( 

132 """ 

133 Pydantic's `by_alias` parameter, passed to Pydantic models to define if 

134 the output should use the alias names (when provided) or the Python 

135 attribute names. In an API, if you set an alias, it's probably because you 

136 want to use it in the result, so you probably want to leave this set to 

137 `True`. 

138 """ 

139 ), 

140 ] = True, 

141 exclude_unset: Annotated[ 

142 bool, 

143 Doc( 

144 """ 

145 Pydantic's `exclude_unset` parameter, passed to Pydantic models to define 

146 if it should exclude from the output the fields that were not explicitly 

147 set (and that only had their default values). 

148 """ 

149 ), 

150 ] = False, 

151 exclude_defaults: Annotated[ 

152 bool, 

153 Doc( 

154 """ 

155 Pydantic's `exclude_defaults` parameter, passed to Pydantic models to define 

156 if it should exclude from the output the fields that had the same default 

157 value, even when they were explicitly set. 

158 """ 

159 ), 

160 ] = False, 

161 exclude_none: Annotated[ 

162 bool, 

163 Doc( 

164 """ 

165 Pydantic's `exclude_none` parameter, passed to Pydantic models to define 

166 if it should exclude from the output any fields that have a `None` value. 

167 """ 

168 ), 

169 ] = False, 

170 custom_encoder: Annotated[ 

171 Optional[Dict[Any, Callable[[Any], Any]]], 

172 Doc( 

173 """ 

174 Pydantic's `custom_encoder` parameter, passed to Pydantic models to define 

175 a custom encoder. 

176 """ 

177 ), 

178 ] = None, 

179 sqlalchemy_safe: Annotated[ 

180 bool, 

181 Doc( 

182 """ 

183 Exclude from the output any fields that start with the name `_sa`. 

184 

185 This is mainly a hack for compatibility with SQLAlchemy objects, they 

186 store internal SQLAlchemy-specific state in attributes named with `_sa`, 

187 and those objects can't (and shouldn't be) serialized to JSON. 

188 """ 

189 ), 

190 ] = True, 

191) -> Any: 

192 """ 

193 Convert any object to something that can be encoded in JSON. 

194 

195 This is used internally by FastAPI to make sure anything you return can be 

196 encoded as JSON before it is sent to the client. 

197 

198 You can also use it yourself, for example to convert objects before saving them 

199 in a database that supports only JSON. 

200 

201 Read more about it in the 

202 [FastAPI docs for JSON Compatible Encoder](https://fastapi.tiangolo.com/tutorial/encoder/). 

203 """ 

204 custom_encoder = custom_encoder or {} 1abcde

205 if custom_encoder: 1abcde

206 if type(obj) in custom_encoder: 1abcde

207 return custom_encoder[type(obj)](obj) 1abcde

208 else: 

209 for encoder_type, encoder_instance in custom_encoder.items(): 1abcde

210 if isinstance(obj, encoder_type): 1abcde

211 return encoder_instance(obj) 1abcde

212 if include is not None and not isinstance(include, (set, dict)): 1abcde

213 include = set(include) 1abcde

214 if exclude is not None and not isinstance(exclude, (set, dict)): 1abcde

215 exclude = set(exclude) 1abcde

216 if isinstance(obj, BaseModel): 1abcde

217 # TODO: remove when deprecating Pydantic v1 

218 encoders: Dict[Any, Any] = {} 1abcde

219 if not PYDANTIC_V2: 1abcde

220 encoders = getattr(obj.__config__, "json_encoders", {}) # type: ignore[attr-defined] 1abcde

221 if custom_encoder: 1abcde

222 encoders.update(custom_encoder) 1abcde

223 obj_dict = _model_dump( 1abcde

224 obj, 

225 mode="json", 

226 include=include, 

227 exclude=exclude, 

228 by_alias=by_alias, 

229 exclude_unset=exclude_unset, 

230 exclude_none=exclude_none, 

231 exclude_defaults=exclude_defaults, 

232 ) 

233 if "__root__" in obj_dict: 1abcde

234 obj_dict = obj_dict["__root__"] 1abcde

235 return jsonable_encoder( 1abcde

236 obj_dict, 

237 exclude_none=exclude_none, 

238 exclude_defaults=exclude_defaults, 

239 # TODO: remove when deprecating Pydantic v1 

240 custom_encoder=encoders, 

241 sqlalchemy_safe=sqlalchemy_safe, 

242 ) 

243 if dataclasses.is_dataclass(obj): 1abcde

244 obj_dict = dataclasses.asdict(obj) 1abcde

245 return jsonable_encoder( 1abcde

246 obj_dict, 

247 include=include, 

248 exclude=exclude, 

249 by_alias=by_alias, 

250 exclude_unset=exclude_unset, 

251 exclude_defaults=exclude_defaults, 

252 exclude_none=exclude_none, 

253 custom_encoder=custom_encoder, 

254 sqlalchemy_safe=sqlalchemy_safe, 

255 ) 

256 if isinstance(obj, Enum): 1abcde

257 return obj.value 1abcde

258 if isinstance(obj, PurePath): 1abcde

259 return str(obj) 1abcde

260 if isinstance(obj, (str, int, float, type(None))): 1abcde

261 return obj 1abcde

262 if isinstance(obj, UndefinedType): 1abcde

263 return None 1abcde

264 if isinstance(obj, dict): 1abcde

265 encoded_dict = {} 1abcde

266 allowed_keys = set(obj.keys()) 1abcde

267 if include is not None: 1abcde

268 allowed_keys &= set(include) 1abcde

269 if exclude is not None: 1abcde

270 allowed_keys -= set(exclude) 1abcde

271 for key, value in obj.items(): 1abcde

272 if ( 1abcd

273 ( 

274 not sqlalchemy_safe 

275 or (not isinstance(key, str)) 

276 or (not key.startswith("_sa")) 

277 ) 

278 and (value is not None or not exclude_none) 

279 and key in allowed_keys 

280 ): 

281 encoded_key = jsonable_encoder( 1abcde

282 key, 

283 by_alias=by_alias, 

284 exclude_unset=exclude_unset, 

285 exclude_none=exclude_none, 

286 custom_encoder=custom_encoder, 

287 sqlalchemy_safe=sqlalchemy_safe, 

288 ) 

289 encoded_value = jsonable_encoder( 1abcde

290 value, 

291 by_alias=by_alias, 

292 exclude_unset=exclude_unset, 

293 exclude_none=exclude_none, 

294 custom_encoder=custom_encoder, 

295 sqlalchemy_safe=sqlalchemy_safe, 

296 ) 

297 encoded_dict[encoded_key] = encoded_value 1abcde

298 return encoded_dict 1abcde

299 if isinstance(obj, (list, set, frozenset, GeneratorType, tuple, deque)): 1abcde

300 encoded_list = [] 1abcde

301 for item in obj: 1abcde

302 encoded_list.append( 1abcde

303 jsonable_encoder( 

304 item, 

305 include=include, 

306 exclude=exclude, 

307 by_alias=by_alias, 

308 exclude_unset=exclude_unset, 

309 exclude_defaults=exclude_defaults, 

310 exclude_none=exclude_none, 

311 custom_encoder=custom_encoder, 

312 sqlalchemy_safe=sqlalchemy_safe, 

313 ) 

314 ) 

315 return encoded_list 1abcde

316 

317 if type(obj) in ENCODERS_BY_TYPE: 1abcde

318 return ENCODERS_BY_TYPE[type(obj)](obj) 1abcde

319 for encoder, classes_tuple in encoders_by_class_tuples.items(): 1abcde

320 if isinstance(obj, classes_tuple): 1abcde

321 return encoder(obj) 1abcde

322 

323 try: 1abcde

324 data = dict(obj) 1abcde

325 except Exception as e: 1abcde

326 errors: List[Exception] = [] 1abcde

327 errors.append(e) 1abcde

328 try: 1abcde

329 data = vars(obj) 1abcde

330 except Exception as e: 1abcde

331 errors.append(e) 1abcde

332 raise ValueError(errors) from e 1abcde

333 return jsonable_encoder( 1abcde

334 data, 

335 include=include, 

336 exclude=exclude, 

337 by_alias=by_alias, 

338 exclude_unset=exclude_unset, 

339 exclude_defaults=exclude_defaults, 

340 exclude_none=exclude_none, 

341 custom_encoder=custom_encoder, 

342 sqlalchemy_safe=sqlalchemy_safe, 

343 )