Coverage for fastapi/utils.py: 100%
91 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
1import re 1abcde
2import warnings 1abcde
3from dataclasses import is_dataclass 1abcde
4from typing import ( 1abcde
5 TYPE_CHECKING,
6 Any,
7 Dict,
8 MutableMapping,
9 Optional,
10 Set,
11 Type,
12 Union,
13 cast,
14)
15from weakref import WeakKeyDictionary 1abcde
17import fastapi 1abcde
18from fastapi._compat import ( 1abcde
19 PYDANTIC_V2,
20 BaseConfig,
21 ModelField,
22 PydanticSchemaGenerationError,
23 Undefined,
24 UndefinedType,
25 Validator,
26 lenient_issubclass,
27)
28from fastapi.datastructures import DefaultPlaceholder, DefaultType 1abcde
29from pydantic import BaseModel, create_model 1abcde
30from pydantic.fields import FieldInfo 1abcde
31from typing_extensions import Literal 1abcde
33if TYPE_CHECKING: # pragma: nocover 1abcde
34 from .routing import APIRoute
36# Cache for `create_cloned_field`
37_CLONED_TYPES_CACHE: MutableMapping[ 1abcde
38 Type[BaseModel], Type[BaseModel]
39] = WeakKeyDictionary()
42def is_body_allowed_for_status_code(status_code: Union[int, str, None]) -> bool: 1abcde
43 if status_code is None: 1abcde
44 return True 1abcde
45 # Ref: https://github.com/OAI/OpenAPI-Specification/blob/main/versions/3.1.0.md#patterned-fields-1
46 if status_code in { 1abcde
47 "default",
48 "1XX",
49 "2XX",
50 "3XX",
51 "4XX",
52 "5XX",
53 }:
54 return True 1abcde
55 current_status_code = int(status_code) 1abcde
56 return not (current_status_code < 200 or current_status_code in {204, 205, 304}) 1abcde
59def get_path_param_names(path: str) -> Set[str]: 1abcde
60 return set(re.findall("{(.*?)}", path)) 1abcde
63def create_response_field( 1abcde
64 name: str,
65 type_: Type[Any],
66 class_validators: Optional[Dict[str, Validator]] = None,
67 default: Optional[Any] = Undefined,
68 required: Union[bool, UndefinedType] = Undefined,
69 model_config: Type[BaseConfig] = BaseConfig,
70 field_info: Optional[FieldInfo] = None,
71 alias: Optional[str] = None,
72 mode: Literal["validation", "serialization"] = "validation",
73) -> ModelField:
74 """
75 Create a new response field. Raises if type_ is invalid.
76 """
77 class_validators = class_validators or {} 1abcde
78 if PYDANTIC_V2: 1abcde
79 field_info = field_info or FieldInfo( 1abcde
80 annotation=type_, default=default, alias=alias
81 )
82 else:
83 field_info = field_info or FieldInfo() 1abcde
84 kwargs = {"name": name, "field_info": field_info} 1abcde
85 if PYDANTIC_V2: 1abcde
86 kwargs.update({"mode": mode}) 1abcde
87 else:
88 kwargs.update( 1abcde
89 {
90 "type_": type_,
91 "class_validators": class_validators,
92 "default": default,
93 "required": required,
94 "model_config": model_config,
95 "alias": alias,
96 }
97 )
98 try: 1abcde
99 return ModelField(**kwargs) # type: ignore[arg-type] 1abcde
100 except (RuntimeError, PydanticSchemaGenerationError): 1abcde
101 raise fastapi.exceptions.FastAPIError( 1abcde
102 "Invalid args for response field! Hint: "
103 f"check that {type_} is a valid Pydantic field type. "
104 "If you are using a return type annotation that is not a valid Pydantic "
105 "field (e.g. Union[Response, dict, None]) you can disable generating the "
106 "response model from the type annotation with the path operation decorator "
107 "parameter response_model=None. Read more: "
108 "https://fastapi.tiangolo.com/tutorial/response-model/"
109 ) from None
112def create_cloned_field( 1abcde
113 field: ModelField,
114 *,
115 cloned_types: Optional[MutableMapping[Type[BaseModel], Type[BaseModel]]] = None,
116) -> ModelField:
117 if PYDANTIC_V2: 1abcde
118 return field 1abcde
119 # cloned_types caches already cloned types to support recursive models and improve
120 # performance by avoiding unnecessary cloning
121 if cloned_types is None: 1abcde
122 cloned_types = _CLONED_TYPES_CACHE 1abcde
124 original_type = field.type_ 1abcde
125 if is_dataclass(original_type) and hasattr(original_type, "__pydantic_model__"): 1abcde
126 original_type = original_type.__pydantic_model__ 1abcde
127 use_type = original_type 1abcde
128 if lenient_issubclass(original_type, BaseModel): 1abcde
129 original_type = cast(Type[BaseModel], original_type) 1abcde
130 use_type = cloned_types.get(original_type) 1abcde
131 if use_type is None: 1abcde
132 use_type = create_model(original_type.__name__, __base__=original_type) 1abcde
133 cloned_types[original_type] = use_type 1abcde
134 for f in original_type.__fields__.values(): 1abcde
135 use_type.__fields__[f.name] = create_cloned_field( 1abcde
136 f, cloned_types=cloned_types
137 )
138 new_field = create_response_field(name=field.name, type_=use_type) 1abcde
139 new_field.has_alias = field.has_alias # type: ignore[attr-defined] 1abcde
140 new_field.alias = field.alias # type: ignore[misc] 1abcde
141 new_field.class_validators = field.class_validators # type: ignore[attr-defined] 1abcde
142 new_field.default = field.default # type: ignore[misc] 1abcde
143 new_field.required = field.required # type: ignore[misc] 1abcde
144 new_field.model_config = field.model_config # type: ignore[attr-defined] 1abcde
145 new_field.field_info = field.field_info 1abcde
146 new_field.allow_none = field.allow_none # type: ignore[attr-defined] 1abcde
147 new_field.validate_always = field.validate_always # type: ignore[attr-defined] 1abcde
148 if field.sub_fields: # type: ignore[attr-defined] 1abcde
149 new_field.sub_fields = [ # type: ignore[attr-defined] 1abcde
150 create_cloned_field(sub_field, cloned_types=cloned_types)
151 for sub_field in field.sub_fields # type: ignore[attr-defined]
152 ]
153 if field.key_field: # type: ignore[attr-defined] 1abcde
154 new_field.key_field = create_cloned_field( # type: ignore[attr-defined] 1abcde
155 field.key_field, # type: ignore[attr-defined]
156 cloned_types=cloned_types,
157 )
158 new_field.validators = field.validators # type: ignore[attr-defined] 1abcde
159 new_field.pre_validators = field.pre_validators # type: ignore[attr-defined] 1abcde
160 new_field.post_validators = field.post_validators # type: ignore[attr-defined] 1abcde
161 new_field.parse_json = field.parse_json # type: ignore[attr-defined] 1abcde
162 new_field.shape = field.shape # type: ignore[attr-defined] 1abcde
163 new_field.populate_validators() # type: ignore[attr-defined] 1abcde
164 return new_field 1abcde
167def generate_operation_id_for_path( 1abcde
168 *, name: str, path: str, method: str 1abcde
169) -> str: # pragma: nocover 1abcde
170 warnings.warn(
171 "fastapi.utils.generate_operation_id_for_path() was deprecated, "
172 "it is not used internally, and will be removed soon",
173 DeprecationWarning,
174 stacklevel=2,
175 )
176 operation_id = f"{name}{path}"
177 operation_id = re.sub(r"\W", "_", operation_id)
178 operation_id = f"{operation_id}_{method.lower()}"
179 return operation_id
182def generate_unique_id(route: "APIRoute") -> str: 1abcde
183 operation_id = f"{route.name}{route.path_format}" 1abcde
184 operation_id = re.sub(r"\W", "_", operation_id) 1abcde
185 assert route.methods 1abcde
186 operation_id = f"{operation_id}_{list(route.methods)[0].lower()}" 1abcde
187 return operation_id 1abcde
190def deep_dict_update(main_dict: Dict[Any, Any], update_dict: Dict[Any, Any]) -> None: 1abcde
191 for key, value in update_dict.items(): 1abcde
192 if ( 1ab
193 key in main_dict
194 and isinstance(main_dict[key], dict)
195 and isinstance(value, dict)
196 ):
197 deep_dict_update(main_dict[key], value) 1abcde
198 elif ( 1ab
199 key in main_dict
200 and isinstance(main_dict[key], list)
201 and isinstance(update_dict[key], list)
202 ):
203 main_dict[key] = main_dict[key] + update_dict[key] 1abcde
204 else:
205 main_dict[key] = value 1abcde
208def get_value_or_default( 1abcde
209 first_item: Union[DefaultPlaceholder, DefaultType],
210 *extra_items: Union[DefaultPlaceholder, DefaultType],
211) -> Union[DefaultPlaceholder, DefaultType]:
212 """
213 Pass items or `DefaultPlaceholder`s by descending priority.
215 The first one to _not_ be a `DefaultPlaceholder` will be returned.
217 Otherwise, the first item (a `DefaultPlaceholder`) will be returned.
218 """
219 items = (first_item,) + extra_items 1abcde
220 for item in items: 1abcde
221 if not isinstance(item, DefaultPlaceholder): 1abcde
222 return item 1abcde
223 return first_item 1abcde