Coverage for fastapi/_compat.py: 100%
300 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
1from collections import deque 1abcde
2from copy import copy 1abcde
3from dataclasses import dataclass, is_dataclass 1abcde
4from enum import Enum 1abcde
5from typing import ( 1abcde
6 Any,
7 Callable,
8 Deque,
9 Dict,
10 FrozenSet,
11 List,
12 Mapping,
13 Sequence,
14 Set,
15 Tuple,
16 Type,
17 Union,
18)
20from fastapi.exceptions import RequestErrorModel 1abcde
21from fastapi.types import IncEx, ModelNameMap, UnionType 1abcde
22from pydantic import BaseModel, create_model 1abcde
23from pydantic.version import VERSION as P_VERSION 1abcde
24from starlette.datastructures import UploadFile 1abcde
25from typing_extensions import Annotated, Literal, get_args, get_origin 1abcde
27# Reassign variable to make it reexported for mypy
28PYDANTIC_VERSION = P_VERSION 1abcde
29PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.") 1abcde
32sequence_annotation_to_type = { 1abcde
33 Sequence: list,
34 List: list,
35 list: list,
36 Tuple: tuple,
37 tuple: tuple,
38 Set: set,
39 set: set,
40 FrozenSet: frozenset,
41 frozenset: frozenset,
42 Deque: deque,
43 deque: deque,
44}
46sequence_types = tuple(sequence_annotation_to_type.keys()) 1abcde
48if PYDANTIC_V2: 1abcde
49 from pydantic import PydanticSchemaGenerationError as PydanticSchemaGenerationError 1abcde
50 from pydantic import TypeAdapter 1abcde
51 from pydantic import ValidationError as ValidationError 1abcde
52 from pydantic._internal._schema_generation_shared import ( # type: ignore[attr-defined] 1abcde
53 GetJsonSchemaHandler as GetJsonSchemaHandler,
54 )
55 from pydantic._internal._typing_extra import eval_type_lenient 1abcde
56 from pydantic._internal._utils import lenient_issubclass as lenient_issubclass 1abcde
57 from pydantic.fields import FieldInfo 1abcde
58 from pydantic.json_schema import GenerateJsonSchema as GenerateJsonSchema 1abcde
59 from pydantic.json_schema import JsonSchemaValue as JsonSchemaValue 1abcde
60 from pydantic_core import CoreSchema as CoreSchema 1abcde
61 from pydantic_core import PydanticUndefined, PydanticUndefinedType 1abcde
62 from pydantic_core import Url as Url 1abcde
64 try: 1abcde
65 from pydantic_core.core_schema import ( 1abcde
66 with_info_plain_validator_function as with_info_plain_validator_function,
67 )
68 except ImportError: # pragma: no cover
69 from pydantic_core.core_schema import (
70 general_plain_validator_function as with_info_plain_validator_function, # noqa: F401
71 )
73 Required = PydanticUndefined 1abcde
74 Undefined = PydanticUndefined 1abcde
75 UndefinedType = PydanticUndefinedType 1abcde
76 evaluate_forwardref = eval_type_lenient 1abcde
77 Validator = Any 1abcde
79 class BaseConfig: 1abcde
80 pass 1abcde
82 class ErrorWrapper(Exception): 1abcde
83 pass 1abcde
85 @dataclass 1abcde
86 class ModelField: 1abcde
87 field_info: FieldInfo 1abcde
88 name: str 1abcde
89 mode: Literal["validation", "serialization"] = "validation" 1abcde
91 @property 1abcde
92 def alias(self) -> str: 1abcde
93 a = self.field_info.alias 1abcde
94 return a if a is not None else self.name 1abcde
96 @property 1abcde
97 def required(self) -> bool: 1abcde
98 return self.field_info.is_required() 1abcde
100 @property 1abcde
101 def default(self) -> Any: 1abcde
102 return self.get_default() 1abcde
104 @property 1abcde
105 def type_(self) -> Any: 1abcde
106 return self.field_info.annotation 1abcde
108 def __post_init__(self) -> None: 1abcde
109 self._type_adapter: TypeAdapter[Any] = TypeAdapter( 1abcde
110 Annotated[self.field_info.annotation, self.field_info]
111 )
113 def get_default(self) -> Any: 1abcde
114 if self.field_info.is_required(): 1abcde
115 return Undefined 1abcde
116 return self.field_info.get_default(call_default_factory=True) 1abcde
118 def validate( 1abcde
119 self,
120 value: Any,
121 values: Dict[str, Any] = {}, # noqa: B006
122 *,
123 loc: Tuple[Union[int, str], ...] = (),
124 ) -> Tuple[Any, Union[List[Dict[str, Any]], None]]:
125 try: 1abcde
126 return ( 1abcde
127 self._type_adapter.validate_python(value, from_attributes=True),
128 None,
129 )
130 except ValidationError as exc: 1abcde
131 return None, _regenerate_error_with_loc( 1abcde
132 errors=exc.errors(include_url=False), loc_prefix=loc
133 )
135 def serialize( 1abcde
136 self,
137 value: Any,
138 *,
139 mode: Literal["json", "python"] = "json",
140 include: Union[IncEx, None] = None,
141 exclude: Union[IncEx, None] = None,
142 by_alias: bool = True,
143 exclude_unset: bool = False,
144 exclude_defaults: bool = False,
145 exclude_none: bool = False,
146 ) -> Any:
147 # What calls this code passes a value that already called
148 # self._type_adapter.validate_python(value)
149 return self._type_adapter.dump_python( 1abcde
150 value,
151 mode=mode,
152 include=include,
153 exclude=exclude,
154 by_alias=by_alias,
155 exclude_unset=exclude_unset,
156 exclude_defaults=exclude_defaults,
157 exclude_none=exclude_none,
158 )
160 def __hash__(self) -> int: 1abcde
161 # Each ModelField is unique for our purposes, to allow making a dict from
162 # ModelField to its JSON Schema.
163 return id(self) 1abcde
165 def get_annotation_from_field_info( 1abcde
166 annotation: Any, field_info: FieldInfo, field_name: str
167 ) -> Any:
168 return annotation 1abcde
170 def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]: 1abcde
171 return errors # type: ignore[return-value] 1abcde
173 def _model_rebuild(model: Type[BaseModel]) -> None: 1abcde
174 model.model_rebuild() 1abcde
176 def _model_dump( 1abcde
177 model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any
178 ) -> Any:
179 return model.model_dump(mode=mode, **kwargs) 1abcde
181 def _get_model_config(model: BaseModel) -> Any: 1abcde
182 return model.model_config 1abcde
184 def get_schema_from_model_field( 1abcde
185 *,
186 field: ModelField,
187 schema_generator: GenerateJsonSchema,
188 model_name_map: ModelNameMap,
189 field_mapping: Dict[
190 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
191 ],
192 separate_input_output_schemas: bool = True,
193 ) -> Dict[str, Any]:
194 override_mode: Union[Literal["validation"], None] = ( 1abcde
195 None if separate_input_output_schemas else "validation"
196 )
197 # This expects that GenerateJsonSchema was already used to generate the definitions
198 json_schema = field_mapping[(field, override_mode or field.mode)] 1abcde
199 if "$ref" not in json_schema: 1abcde
200 # TODO remove when deprecating Pydantic v1
201 # Ref: https://github.com/pydantic/pydantic/blob/d61792cc42c80b13b23e3ffa74bc37ec7c77f7d1/pydantic/schema.py#L207
202 json_schema["title"] = ( 1abcde
203 field.field_info.title or field.alias.title().replace("_", " ")
204 )
205 return json_schema 1abcde
207 def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap: 1abcde
208 return {} 1abcde
210 def get_definitions( 1abcde
211 *,
212 fields: List[ModelField],
213 schema_generator: GenerateJsonSchema,
214 model_name_map: ModelNameMap,
215 separate_input_output_schemas: bool = True,
216 ) -> Tuple[
217 Dict[
218 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
219 ],
220 Dict[str, Dict[str, Any]],
221 ]:
222 override_mode: Union[Literal["validation"], None] = ( 1abcde
223 None if separate_input_output_schemas else "validation"
224 )
225 inputs = [ 1abcde
226 (field, override_mode or field.mode, field._type_adapter.core_schema)
227 for field in fields
228 ]
229 field_mapping, definitions = schema_generator.generate_definitions( 1abcde
230 inputs=inputs
231 )
232 return field_mapping, definitions # type: ignore[return-value] 1abcde
234 def is_scalar_field(field: ModelField) -> bool: 1abcde
235 from fastapi import params 1abcde
237 return field_annotation_is_scalar( 1abcde
238 field.field_info.annotation
239 ) and not isinstance(field.field_info, params.Body)
241 def is_sequence_field(field: ModelField) -> bool: 1abcde
242 return field_annotation_is_sequence(field.field_info.annotation) 1abcde
244 def is_scalar_sequence_field(field: ModelField) -> bool: 1abcde
245 return field_annotation_is_scalar_sequence(field.field_info.annotation) 1abcde
247 def is_bytes_field(field: ModelField) -> bool: 1abcde
248 return is_bytes_or_nonable_bytes_annotation(field.type_) 1abcde
250 def is_bytes_sequence_field(field: ModelField) -> bool: 1abcde
251 return is_bytes_sequence_annotation(field.type_) 1abcde
253 def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo: 1abcde
254 cls = type(field_info) 1abcde
255 merged_field_info = cls.from_annotation(annotation) 1abcde
256 new_field_info = copy(field_info) 1abcde
257 new_field_info.metadata = merged_field_info.metadata 1abcde
258 new_field_info.annotation = merged_field_info.annotation 1abcde
259 return new_field_info 1abcde
261 def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]: 1abcde
262 origin_type = ( 1abcde
263 get_origin(field.field_info.annotation) or field.field_info.annotation
264 )
265 assert issubclass(origin_type, sequence_types) # type: ignore[arg-type] 1abcde
266 return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return] 1abcde
268 def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: 1abcde
269 error = ValidationError.from_exception_data( 1abcde
270 "Field required", [{"type": "missing", "loc": loc, "input": {}}]
271 ).errors(include_url=False)[0]
272 error["input"] = None 1abcde
273 return error # type: ignore[return-value] 1abcde
275 def create_body_model( 1abcde
276 *, fields: Sequence[ModelField], model_name: str
277 ) -> Type[BaseModel]:
278 field_params = {f.name: (f.field_info.annotation, f.field_info) for f in fields} 1abcde
279 BodyModel: Type[BaseModel] = create_model(model_name, **field_params) # type: ignore[call-overload] 1abcde
280 return BodyModel 1abcde
282else:
283 from fastapi.openapi.constants import REF_PREFIX as REF_PREFIX 1abcde
284 from pydantic import AnyUrl as Url # noqa: F401 1abcde
285 from pydantic import ( # type: ignore[assignment] 1abcde
286 BaseConfig as BaseConfig, # noqa: F401
287 )
288 from pydantic import ValidationError as ValidationError # noqa: F401 1abcde
289 from pydantic.class_validators import ( # type: ignore[no-redef] 1abcde
290 Validator as Validator, # noqa: F401
291 )
292 from pydantic.error_wrappers import ( # type: ignore[no-redef] 1abcde
293 ErrorWrapper as ErrorWrapper, # noqa: F401
294 )
295 from pydantic.errors import MissingError 1abcde
296 from pydantic.fields import ( # type: ignore[attr-defined] 1abcde
297 SHAPE_FROZENSET,
298 SHAPE_LIST,
299 SHAPE_SEQUENCE,
300 SHAPE_SET,
301 SHAPE_SINGLETON,
302 SHAPE_TUPLE,
303 SHAPE_TUPLE_ELLIPSIS,
304 )
305 from pydantic.fields import FieldInfo as FieldInfo 1abcde
306 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde
307 ModelField as ModelField, # noqa: F401
308 )
309 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde
310 Required as Required, # noqa: F401
311 )
312 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde
313 Undefined as Undefined,
314 )
315 from pydantic.fields import ( # type: ignore[no-redef, attr-defined] 1abcde
316 UndefinedType as UndefinedType, # noqa: F401
317 )
318 from pydantic.schema import ( 1abcde
319 field_schema,
320 get_flat_models_from_fields,
321 get_model_name_map,
322 model_process_schema,
323 )
324 from pydantic.schema import ( # type: ignore[no-redef] # noqa: F401 1abcde
325 get_annotation_from_field_info as get_annotation_from_field_info,
326 )
327 from pydantic.typing import ( # type: ignore[no-redef] 1abcde
328 evaluate_forwardref as evaluate_forwardref, # noqa: F401
329 )
330 from pydantic.utils import ( # type: ignore[no-redef] 1abcde
331 lenient_issubclass as lenient_issubclass, # noqa: F401
332 )
334 GetJsonSchemaHandler = Any # type: ignore[assignment,misc] 1abcde
335 JsonSchemaValue = Dict[str, Any] # type: ignore[misc] 1abcde
336 CoreSchema = Any # type: ignore[assignment,misc] 1abcde
338 sequence_shapes = { 1abcde
339 SHAPE_LIST,
340 SHAPE_SET,
341 SHAPE_FROZENSET,
342 SHAPE_TUPLE,
343 SHAPE_SEQUENCE,
344 SHAPE_TUPLE_ELLIPSIS,
345 }
346 sequence_shape_to_type = { 1abcde
347 SHAPE_LIST: list,
348 SHAPE_SET: set,
349 SHAPE_TUPLE: tuple,
350 SHAPE_SEQUENCE: list,
351 SHAPE_TUPLE_ELLIPSIS: list,
352 }
354 @dataclass 1abcde
355 class GenerateJsonSchema: # type: ignore[no-redef] 1abcde
356 ref_template: str 1abcde
358 class PydanticSchemaGenerationError(Exception): # type: ignore[no-redef] 1abcde
359 pass 1abcde
361 def with_info_plain_validator_function( # type: ignore[misc] 1abcde
362 function: Callable[..., Any],
363 *,
364 ref: Union[str, None] = None,
365 metadata: Any = None,
366 serialization: Any = None,
367 ) -> Any:
368 return {} 1abcde
370 def get_model_definitions( 1abcde
371 *,
372 flat_models: Set[Union[Type[BaseModel], Type[Enum]]],
373 model_name_map: Dict[Union[Type[BaseModel], Type[Enum]], str],
374 ) -> Dict[str, Any]:
375 definitions: Dict[str, Dict[str, Any]] = {} 1abcde
376 for model in flat_models: 1abcde
377 m_schema, m_definitions, m_nested_models = model_process_schema( 1abcde
378 model, model_name_map=model_name_map, ref_prefix=REF_PREFIX
379 )
380 definitions.update(m_definitions) 1abcde
381 model_name = model_name_map[model] 1abcde
382 if "description" in m_schema: 1abcde
383 m_schema["description"] = m_schema["description"].split("\f")[0] 1abcde
384 definitions[model_name] = m_schema 1abcde
385 return definitions 1abcde
387 def is_pv1_scalar_field(field: ModelField) -> bool: 1abcde
388 from fastapi import params 1abcde
390 field_info = field.field_info 1abcde
391 if not ( 1ab
392 field.shape == SHAPE_SINGLETON # type: ignore[attr-defined]
393 and not lenient_issubclass(field.type_, BaseModel)
394 and not lenient_issubclass(field.type_, dict)
395 and not field_annotation_is_sequence(field.type_)
396 and not is_dataclass(field.type_)
397 and not isinstance(field_info, params.Body)
398 ):
399 return False 1abcde
400 if field.sub_fields: # type: ignore[attr-defined] 1abcde
401 if not all( 1abcde
402 is_pv1_scalar_field(f)
403 for f in field.sub_fields # type: ignore[attr-defined]
404 ):
405 return False 1abcde
406 return True 1abcde
408 def is_pv1_scalar_sequence_field(field: ModelField) -> bool: 1abcde
409 if (field.shape in sequence_shapes) and not lenient_issubclass( # type: ignore[attr-defined] 1abcde
410 field.type_, BaseModel
411 ):
412 if field.sub_fields is not None: # type: ignore[attr-defined] 1abcde
413 for sub_field in field.sub_fields: # type: ignore[attr-defined] 1abcde
414 if not is_pv1_scalar_field(sub_field): 1abcde
415 return False 1abcde
416 return True 1abcde
417 if _annotation_is_sequence(field.type_): 1abcde
418 return True 1abcde
419 return False 1abcde
421 def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]: 1abcde
422 use_errors: List[Any] = [] 1abcde
423 for error in errors: 1abcde
424 if isinstance(error, ErrorWrapper): 1abcde
425 new_errors = ValidationError( # type: ignore[call-arg] 1abcde
426 errors=[error], model=RequestErrorModel
427 ).errors()
428 use_errors.extend(new_errors) 1abcde
429 elif isinstance(error, list): 1abcde
430 use_errors.extend(_normalize_errors(error)) 1abcde
431 else:
432 use_errors.append(error) 1abcde
433 return use_errors 1abcde
435 def _model_rebuild(model: Type[BaseModel]) -> None: 1abcde
436 model.update_forward_refs() 1abcde
438 def _model_dump( 1abcde
439 model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any
440 ) -> Any:
441 return model.dict(**kwargs) 1abcde
443 def _get_model_config(model: BaseModel) -> Any: 1abcde
444 return model.__config__ # type: ignore[attr-defined] 1abcde
446 def get_schema_from_model_field( 1abcde
447 *,
448 field: ModelField,
449 schema_generator: GenerateJsonSchema,
450 model_name_map: ModelNameMap,
451 field_mapping: Dict[
452 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
453 ],
454 separate_input_output_schemas: bool = True,
455 ) -> Dict[str, Any]:
456 # This expects that GenerateJsonSchema was already used to generate the definitions
457 return field_schema( # type: ignore[no-any-return] 1abcde
458 field, model_name_map=model_name_map, ref_prefix=REF_PREFIX
459 )[0]
461 def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap: 1abcde
462 models = get_flat_models_from_fields(fields, known_models=set()) 1abcde
463 return get_model_name_map(models) # type: ignore[no-any-return] 1abcde
465 def get_definitions( 1abcde
466 *,
467 fields: List[ModelField],
468 schema_generator: GenerateJsonSchema,
469 model_name_map: ModelNameMap,
470 separate_input_output_schemas: bool = True,
471 ) -> Tuple[
472 Dict[
473 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue
474 ],
475 Dict[str, Dict[str, Any]],
476 ]:
477 models = get_flat_models_from_fields(fields, known_models=set()) 1abcde
478 return {}, get_model_definitions( 1abcde
479 flat_models=models, model_name_map=model_name_map
480 )
482 def is_scalar_field(field: ModelField) -> bool: 1abcde
483 return is_pv1_scalar_field(field) 1abcde
485 def is_sequence_field(field: ModelField) -> bool: 1abcde
486 return field.shape in sequence_shapes or _annotation_is_sequence(field.type_) # type: ignore[attr-defined] 1abcde
488 def is_scalar_sequence_field(field: ModelField) -> bool: 1abcde
489 return is_pv1_scalar_sequence_field(field) 1abcde
491 def is_bytes_field(field: ModelField) -> bool: 1abcde
492 return lenient_issubclass(field.type_, bytes) 1abcde
494 def is_bytes_sequence_field(field: ModelField) -> bool: 1abcde
495 return field.shape in sequence_shapes and lenient_issubclass(field.type_, bytes) # type: ignore[attr-defined] 1abcde
497 def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo: 1abcde
498 return copy(field_info) 1abcde
500 def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]: 1abcde
501 return sequence_shape_to_type[field.shape](value) # type: ignore[no-any-return,attr-defined] 1abcde
503 def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: 1abcde
504 missing_field_error = ErrorWrapper(MissingError(), loc=loc) # type: ignore[call-arg] 1abcde
505 new_error = ValidationError([missing_field_error], RequestErrorModel) 1abcde
506 return new_error.errors()[0] # type: ignore[return-value] 1abcde
508 def create_body_model( 1abcde
509 *, fields: Sequence[ModelField], model_name: str
510 ) -> Type[BaseModel]:
511 BodyModel = create_model(model_name) 1abcde
512 for f in fields: 1abcde
513 BodyModel.__fields__[f.name] = f # type: ignore[index] 1abcde
514 return BodyModel 1abcde
517def _regenerate_error_with_loc( 1abcde
518 *, errors: Sequence[Any], loc_prefix: Tuple[Union[str, int], ...]
519) -> List[Dict[str, Any]]:
520 updated_loc_errors: List[Any] = [ 1abcde
521 {**err, "loc": loc_prefix + err.get("loc", ())}
522 for err in _normalize_errors(errors)
523 ]
525 return updated_loc_errors 1abcde
528def _annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde
529 if lenient_issubclass(annotation, (str, bytes)): 1abcde
530 return False 1abcde
531 return lenient_issubclass(annotation, sequence_types) 1abcde
534def field_annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde
535 return _annotation_is_sequence(annotation) or _annotation_is_sequence( 1abcde
536 get_origin(annotation)
537 )
540def value_is_sequence(value: Any) -> bool: 1abcde
541 return isinstance(value, sequence_types) and not isinstance(value, (str, bytes)) # type: ignore[arg-type] 1abcde
544def _annotation_is_complex(annotation: Union[Type[Any], None]) -> bool: 1abcde
545 return ( 1abcde
546 lenient_issubclass(annotation, (BaseModel, Mapping, UploadFile))
547 or _annotation_is_sequence(annotation)
548 or is_dataclass(annotation)
549 )
552def field_annotation_is_complex(annotation: Union[Type[Any], None]) -> bool: 1abcde
553 origin = get_origin(annotation) 1abcde
554 if origin is Union or origin is UnionType: 1abcde
555 return any(field_annotation_is_complex(arg) for arg in get_args(annotation)) 1abcde
557 return ( 1abcde
558 _annotation_is_complex(annotation)
559 or _annotation_is_complex(origin)
560 or hasattr(origin, "__pydantic_core_schema__")
561 or hasattr(origin, "__get_pydantic_core_schema__")
562 )
565def field_annotation_is_scalar(annotation: Any) -> bool: 1abcde
566 # handle Ellipsis here to make tuple[int, ...] work nicely
567 return annotation is Ellipsis or not field_annotation_is_complex(annotation) 1abcde
570def field_annotation_is_scalar_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde
571 origin = get_origin(annotation) 1abcde
572 if origin is Union or origin is UnionType: 1abcde
573 at_least_one_scalar_sequence = False 1abcde
574 for arg in get_args(annotation): 1abcde
575 if field_annotation_is_scalar_sequence(arg): 1abcde
576 at_least_one_scalar_sequence = True 1abcde
577 continue 1abcde
578 elif not field_annotation_is_scalar(arg): 1abcde
579 return False 1abcde
580 return at_least_one_scalar_sequence 1abcde
581 return field_annotation_is_sequence(annotation) and all( 1abcde
582 field_annotation_is_scalar(sub_annotation)
583 for sub_annotation in get_args(annotation)
584 )
587def is_bytes_or_nonable_bytes_annotation(annotation: Any) -> bool: 1abcde
588 if lenient_issubclass(annotation, bytes): 1abcde
589 return True 1abcde
590 origin = get_origin(annotation) 1abcde
591 if origin is Union or origin is UnionType: 1abcde
592 for arg in get_args(annotation): 1abcde
593 if lenient_issubclass(arg, bytes): 1abcde
594 return True 1abcde
595 return False 1abcde
598def is_uploadfile_or_nonable_uploadfile_annotation(annotation: Any) -> bool: 1abcde
599 if lenient_issubclass(annotation, UploadFile): 1abcde
600 return True 1abcde
601 origin = get_origin(annotation) 1abcde
602 if origin is Union or origin is UnionType: 1abcde
603 for arg in get_args(annotation): 1abcde
604 if lenient_issubclass(arg, UploadFile): 1abcde
605 return True 1abcde
606 return False 1abcde
609def is_bytes_sequence_annotation(annotation: Any) -> bool: 1abcde
610 origin = get_origin(annotation) 1abcde
611 if origin is Union or origin is UnionType: 1abcde
612 at_least_one = False 1abcde
613 for arg in get_args(annotation): 1abcde
614 if is_bytes_sequence_annotation(arg): 1abcde
615 at_least_one = True 1abcde
616 continue 1abcde
617 return at_least_one 1abcde
618 return field_annotation_is_sequence(annotation) and all( 1abcde
619 is_bytes_or_nonable_bytes_annotation(sub_annotation)
620 for sub_annotation in get_args(annotation)
621 )
624def is_uploadfile_sequence_annotation(annotation: Any) -> bool: 1abcde
625 origin = get_origin(annotation) 1abcde
626 if origin is Union or origin is UnionType: 1abcde
627 at_least_one = False 1abcde
628 for arg in get_args(annotation): 1abcde
629 if is_uploadfile_sequence_annotation(arg): 1abcde
630 at_least_one = True 1abcde
631 continue 1abcde
632 return at_least_one 1abcde
633 return field_annotation_is_sequence(annotation) and all( 1abcde
634 is_uploadfile_or_nonable_uploadfile_annotation(sub_annotation)
635 for sub_annotation in get_args(annotation)
636 )