Coverage for fastapi/utils.py: 100%

91 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-08 03:53 +0000

1import re 1abcde

2import warnings 1abcde

3from dataclasses import is_dataclass 1abcde

4from typing import ( 1abcde

5 TYPE_CHECKING, 

6 Any, 

7 Dict, 

8 MutableMapping, 

9 Optional, 

10 Set, 

11 Type, 

12 Union, 

13 cast, 

14) 

15from weakref import WeakKeyDictionary 1abcde

16 

17import fastapi 1abcde

18from fastapi._compat import ( 1abcde

19 PYDANTIC_V2, 

20 BaseConfig, 

21 ModelField, 

22 PydanticSchemaGenerationError, 

23 Undefined, 

24 UndefinedType, 

25 Validator, 

26 lenient_issubclass, 

27) 

28from fastapi.datastructures import DefaultPlaceholder, DefaultType 1abcde

29from pydantic import BaseModel, create_model 1abcde

30from pydantic.fields import FieldInfo 1abcde

31from typing_extensions import Literal 1abcde

32 

33if TYPE_CHECKING: # pragma: nocover 1abcde

34 from .routing import APIRoute 

35 

36# Cache for `create_cloned_field` 

37_CLONED_TYPES_CACHE: MutableMapping[ 1abcde

38 Type[BaseModel], Type[BaseModel] 

39] = WeakKeyDictionary() 

40 

41 

42def is_body_allowed_for_status_code(status_code: Union[int, str, None]) -> bool: 1abcde

43 if status_code is None: 1abcde

44 return True 1abcde

45 # Ref: https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.1.0.md#patterned-fields-1 

46 if status_code in { 1abcde

47 "default", 

48 "1XX", 

49 "2XX", 

50 "3XX", 

51 "4XX", 

52 "5XX", 

53 }: 

54 return True 1abcde

55 current_status_code = int(status_code) 1abcde

56 return not (current_status_code < 200 or current_status_code in {204, 205, 304}) 1abcde

57 

58 

59def get_path_param_names(path: str) -> Set[str]: 1abcde

60 return set(re.findall("{(.*?)}", path)) 1abcde

61 

62 

63def create_response_field( 1abcde

64 name: str, 

65 type_: Type[Any], 

66 class_validators: Optional[Dict[str, Validator]] = None, 

67 default: Optional[Any] = Undefined, 

68 required: Union[bool, UndefinedType] = Undefined, 

69 model_config: Type[BaseConfig] = BaseConfig, 

70 field_info: Optional[FieldInfo] = None, 

71 alias: Optional[str] = None, 

72 mode: Literal["validation", "serialization"] = "validation", 

73) -> ModelField: 

74 """ 

75 Create a new response field. Raises if type_ is invalid. 

76 """ 

77 class_validators = class_validators or {} 1abcde

78 if PYDANTIC_V2: 1abcde

79 field_info = field_info or FieldInfo( 1abcde

80 annotation=type_, default=default, alias=alias 

81 ) 

82 else: 

83 field_info = field_info or FieldInfo() 1abcde

84 kwargs = {"name": name, "field_info": field_info} 1abcde

85 if PYDANTIC_V2: 1abcde

86 kwargs.update({"mode": mode}) 1abcde

87 else: 

88 kwargs.update( 1abcde

89 { 

90 "type_": type_, 

91 "class_validators": class_validators, 

92 "default": default, 

93 "required": required, 

94 "model_config": model_config, 

95 "alias": alias, 

96 } 

97 ) 

98 try: 1abcde

99 return ModelField(**kwargs) # type: ignore[arg-type] 1abcde

100 except (RuntimeError, PydanticSchemaGenerationError): 1abcde

101 raise fastapi.exceptions.FastAPIError( 1abcde

102 "Invalid args for response field! Hint: " 

103 f"check that {type_} is a valid Pydantic field type. " 

104 "If you are using a return type annotation that is not a valid Pydantic " 

105 "field (e.g. Union[Response, dict, None]) you can disable generating the " 

106 "response model from the type annotation with the path operation decorator " 

107 "parameter response_model=None. Read more: " 

108 "https://fastapi.tiangolo.com/tutorial/response-model/" 

109 ) from None 

110 

111 

112def create_cloned_field( 1abcde

113 field: ModelField, 

114 *, 

115 cloned_types: Optional[MutableMapping[Type[BaseModel], Type[BaseModel]]] = None, 

116) -> ModelField: 

117 if PYDANTIC_V2: 1abcde

118 return field 1abcde

119 # cloned_types caches already cloned types to support recursive models and improve 

120 # performance by avoiding unnecessary cloning 

121 if cloned_types is None: 1abcde

122 cloned_types = _CLONED_TYPES_CACHE 1abcde

123 

124 original_type = field.type_ 1abcde

125 if is_dataclass(original_type) and hasattr(original_type, "__pydantic_model__"): 1abcde

126 original_type = original_type.__pydantic_model__ 1abcde

127 use_type = original_type 1abcde

128 if lenient_issubclass(original_type, BaseModel): 1abcde

129 original_type = cast(Type[BaseModel], original_type) 1abcde

130 use_type = cloned_types.get(original_type) 1abcde

131 if use_type is None: 1abcde

132 use_type = create_model(original_type.__name__, __base__=original_type) 1abcde

133 cloned_types[original_type] = use_type 1abcde

134 for f in original_type.__fields__.values(): 1abcde

135 use_type.__fields__[f.name] = create_cloned_field( 1abcde

136 f, cloned_types=cloned_types 

137 ) 

138 new_field = create_response_field(name=field.name, type_=use_type) 1abcde

139 new_field.has_alias = field.has_alias # type: ignore[attr-defined] 1abcde

140 new_field.alias = field.alias # type: ignore[misc] 1abcde

141 new_field.class_validators = field.class_validators # type: ignore[attr-defined] 1abcde

142 new_field.default = field.default # type: ignore[misc] 1abcde

143 new_field.required = field.required # type: ignore[misc] 1abcde

144 new_field.model_config = field.model_config # type: ignore[attr-defined] 1abcde

145 new_field.field_info = field.field_info 1abcde

146 new_field.allow_none = field.allow_none # type: ignore[attr-defined] 1abcde

147 new_field.validate_always = field.validate_always # type: ignore[attr-defined] 1abcde

148 if field.sub_fields: # type: ignore[attr-defined] 1abcde

149 new_field.sub_fields = [ # type: ignore[attr-defined] 1abcde

150 create_cloned_field(sub_field, cloned_types=cloned_types) 

151 for sub_field in field.sub_fields # type: ignore[attr-defined] 

152 ] 

153 if field.key_field: # type: ignore[attr-defined] 1abcde

154 new_field.key_field = create_cloned_field( # type: ignore[attr-defined] 1abcde

155 field.key_field, # type: ignore[attr-defined] 

156 cloned_types=cloned_types, 

157 ) 

158 new_field.validators = field.validators # type: ignore[attr-defined] 1abcde

159 new_field.pre_validators = field.pre_validators # type: ignore[attr-defined] 1abcde

160 new_field.post_validators = field.post_validators # type: ignore[attr-defined] 1abcde

161 new_field.parse_json = field.parse_json # type: ignore[attr-defined] 1abcde

162 new_field.shape = field.shape # type: ignore[attr-defined] 1abcde

163 new_field.populate_validators() # type: ignore[attr-defined] 1abcde

164 return new_field 1abcde

165 

166 

167def generate_operation_id_for_path( 1abcde

168 *, name: str, path: str, method: str 1abcde

169) -> str: # pragma: nocover 1abcde

170 warnings.warn( 

171 "fastapi.utils.generate_operation_id_for_path() was deprecated, " 

172 "it is not used internally, and will be removed soon", 

173 DeprecationWarning, 

174 stacklevel=2, 

175 ) 

176 operation_id = f"{name}{path}" 

177 operation_id = re.sub(r"\W", "_", operation_id) 

178 operation_id = f"{operation_id}_{method.lower()}" 

179 return operation_id 

180 

181 

182def generate_unique_id(route: "APIRoute") -> str: 1abcde

183 operation_id = f"{route.name}{route.path_format}" 1abcde

184 operation_id = re.sub(r"\W", "_", operation_id) 1abcde

185 assert route.methods 1abcde

186 operation_id = f"{operation_id}_{list(route.methods)[0].lower()}" 1abcde

187 return operation_id 1abcde

188 

189 

190def deep_dict_update(main_dict: Dict[Any, Any], update_dict: Dict[Any, Any]) -> None: 1abcde

191 for key, value in update_dict.items(): 1abcde

192 if ( 1ab

193 key in main_dict 

194 and isinstance(main_dict[key], dict) 

195 and isinstance(value, dict) 

196 ): 

197 deep_dict_update(main_dict[key], value) 1abcde

198 elif ( 1ab

199 key in main_dict 

200 and isinstance(main_dict[key], list) 

201 and isinstance(update_dict[key], list) 

202 ): 

203 main_dict[key] = main_dict[key] + update_dict[key] 1abcde

204 else: 

205 main_dict[key] = value 1abcde

206 

207 

208def get_value_or_default( 1abcde

209 first_item: Union[DefaultPlaceholder, DefaultType], 

210 *extra_items: Union[DefaultPlaceholder, DefaultType], 

211) -> Union[DefaultPlaceholder, DefaultType]: 

212 """ 

213 Pass items or `DefaultPlaceholder`s by descending priority. 

214 

215 The first one to _not_ be a `DefaultPlaceholder` will be returned. 

216 

217 Otherwise, the first item (a `DefaultPlaceholder`) will be returned. 

218 """ 

219 items = (first_item,) + extra_items 1abcde

220 for item in items: 1abcde

221 if not isinstance(item, DefaultPlaceholder): 1abcde

222 return item 1abcde

223 return first_item 1abcde