Coverage for fastapi/_compat.py: 100%

300 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-08 03:53 +0000

1from collections import deque 1abcde

2from copy import copy 1abcde

3from dataclasses import dataclass, is_dataclass 1abcde

4from enum import Enum 1abcde

5from typing import ( 1abcde

6 Any, 

7 Callable, 

8 Deque, 

9 Dict, 

10 FrozenSet, 

11 List, 

12 Mapping, 

13 Sequence, 

14 Set, 

15 Tuple, 

16 Type, 

17 Union, 

18) 

19 

20from fastapi.exceptions import RequestErrorModel 1abcde

21from fastapi.types import IncEx, ModelNameMap, UnionType 1abcde

22from pydantic import BaseModel, create_model 1abcde

23from pydantic.version import VERSION as P_VERSION 1abcde

24from starlette.datastructures import UploadFile 1abcde

25from typing_extensions import Annotated, Literal, get_args, get_origin 1abcde

26 

27# Reassign variable to make it reexported for mypy 

28PYDANTIC_VERSION = P_VERSION 1abcde

29PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.") 1abcde

30 

31 

32sequence_annotation_to_type = { 1abcde

33 Sequence: list, 

34 List: list, 

35 list: list, 

36 Tuple: tuple, 

37 tuple: tuple, 

38 Set: set, 

39 set: set, 

40 FrozenSet: frozenset, 

41 frozenset: frozenset, 

42 Deque: deque, 

43 deque: deque, 

44} 

45 

46sequence_types = tuple(sequence_annotation_to_type.keys()) 1abcde

47 

48if PYDANTIC_V2: 1abcde

49 from pydantic import PydanticSchemaGenerationError as PydanticSchemaGenerationError 1abcde

50 from pydantic import TypeAdapter 1abcde

51 from pydantic import ValidationError as ValidationError 1abcde

52 from pydantic._internal._schema_generation_shared import ( # type: ignore[attr-defined] 1abcde

53 GetJsonSchemaHandler as GetJsonSchemaHandler, 

54 ) 

55 from pydantic._internal._typing_extra import eval_type_lenient 1abcde

56 from pydantic._internal._utils import lenient_issubclass as lenient_issubclass 1abcde

57 from pydantic.fields import FieldInfo 1abcde

58 from pydantic.json_schema import GenerateJsonSchema as GenerateJsonSchema 1abcde

59 from pydantic.json_schema import JsonSchemaValue as JsonSchemaValue 1abcde

60 from pydantic_core import CoreSchema as CoreSchema 1abcde

61 from pydantic_core import PydanticUndefined, PydanticUndefinedType 1abcde

62 from pydantic_core import Url as Url 1abcde

63 

64 try: 1abcde

65 from pydantic_core.core_schema import ( 1abcde

66 with_info_plain_validator_function as with_info_plain_validator_function, 

67 ) 

68 except ImportError: # pragma: no cover 

69 from pydantic_core.core_schema import ( 

70 general_plain_validator_function as with_info_plain_validator_function, # noqa: F401 

71 ) 

72 

73 Required = PydanticUndefined 1abcde

74 Undefined = PydanticUndefined 1abcde

75 UndefinedType = PydanticUndefinedType 1abcde

76 evaluate_forwardref = eval_type_lenient 1abcde

77 Validator = Any 1abcde

78 

79 class BaseConfig: 1abcde

80 pass 1abcde

81 

82 class ErrorWrapper(Exception): 1abcde

83 pass 1abcde

84 

85 @dataclass 1abcde

86 class ModelField: 1abcde

87 field_info: FieldInfo 1abcde

88 name: str 1abcde

89 mode: Literal["validation", "serialization"] = "validation" 1abcde

90 

91 @property 1abcde

92 def alias(self) -> str: 1abcde

93 a = self.field_info.alias 1abcde

94 return a if a is not None else self.name 1abcde

95 

96 @property 1abcde

97 def required(self) -> bool: 1abcde

98 return self.field_info.is_required() 1abcde

99 

100 @property 1abcde

101 def default(self) -> Any: 1abcde

102 return self.get_default() 1abcde

103 

104 @property 1abcde

105 def type_(self) -> Any: 1abcde

106 return self.field_info.annotation 1abcde

107 

108 def __post_init__(self) -> None: 1abcde

109 self._type_adapter: TypeAdapter[Any] = TypeAdapter( 1abcde

110 Annotated[self.field_info.annotation, self.field_info] 

111 ) 

112 

113 def get_default(self) -> Any: 1abcde

114 if self.field_info.is_required(): 1abcde

115 return Undefined 1abcde

116 return self.field_info.get_default(call_default_factory=True) 1abcde

117 

118 def validate( 1abcde

119 self, 

120 value: Any, 

121 values: Dict[str, Any] = {}, # noqa: B006 

122 *, 

123 loc: Tuple[Union[int, str], ...] = (), 

124 ) -> Tuple[Any, Union[List[Dict[str, Any]], None]]: 

125 try: 1abcde

126 return ( 1abcde

127 self._type_adapter.validate_python(value, from_attributes=True), 

128 None, 

129 ) 

130 except ValidationError as exc: 1abcde

131 return None, _regenerate_error_with_loc( 1abcde

132 errors=exc.errors(include_url=False), loc_prefix=loc 

133 ) 

134 

135 def serialize( 1abcde

136 self, 

137 value: Any, 

138 *, 

139 mode: Literal["json", "python"] = "json", 

140 include: Union[IncEx, None] = None, 

141 exclude: Union[IncEx, None] = None, 

142 by_alias: bool = True, 

143 exclude_unset: bool = False, 

144 exclude_defaults: bool = False, 

145 exclude_none: bool = False, 

146 ) -> Any: 

147 # What calls this code passes a value that already called 

148 # self._type_adapter.validate_python(value) 

149 return self._type_adapter.dump_python( 1abcde

150 value, 

151 mode=mode, 

152 include=include, 

153 exclude=exclude, 

154 by_alias=by_alias, 

155 exclude_unset=exclude_unset, 

156 exclude_defaults=exclude_defaults, 

157 exclude_none=exclude_none, 

158 ) 

159 

160 def __hash__(self) -> int: 1abcde

161 # Each ModelField is unique for our purposes, to allow making a dict from 

162 # ModelField to its JSON Schema. 

163 return id(self) 1abcde

164 

165 def get_annotation_from_field_info( 1abcde

166 annotation: Any, field_info: FieldInfo, field_name: str 

167 ) -> Any: 

168 return annotation 1abcde

169 

170 def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]: 1abcde

171 return errors # type: ignore[return-value] 1abcde

172 

173 def _model_rebuild(model: Type[BaseModel]) -> None: 1abcde

174 model.model_rebuild() 1abcde

175 

176 def _model_dump( 1abcde

177 model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any 

178 ) -> Any: 

179 return model.model_dump(mode=mode, **kwargs) 1abcde

180 

181 def _get_model_config(model: BaseModel) -> Any: 1abcde

182 return model.model_config 1abcde

183 

184 def get_schema_from_model_field( 1abcde

185 *, 

186 field: ModelField, 

187 schema_generator: GenerateJsonSchema, 

188 model_name_map: ModelNameMap, 

189 field_mapping: Dict[ 

190 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue 

191 ], 

192 separate_input_output_schemas: bool = True, 

193 ) -> Dict[str, Any]: 

194 override_mode: Union[Literal["validation"], None] = ( 1abcde

195 None if separate_input_output_schemas else "validation" 

196 ) 

197 # This expects that GenerateJsonSchema was already used to generate the definitions 

198 json_schema = field_mapping[(field, override_mode or field.mode)] 1abcde

199 if "$ref" not in json_schema: 1abcde

200 # TODO remove when deprecating Pydantic v1 

201 # Ref: https://github.com/pydantic/pydantic/blob/d61792cc42c80b13b23e3ffa74bc37ec7c77f7d1/pydantic/schema.py#L207 

202 json_schema["title"] = ( 1abcde

203 field.field_info.title or field.alias.title().replace("_", " ") 

204 ) 

205 return json_schema 1abcde

206 

207 def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap: 1abcde

208 return {} 1abcde

209 

210 def get_definitions( 1abcde

211 *, 

212 fields: List[ModelField], 

213 schema_generator: GenerateJsonSchema, 

214 model_name_map: ModelNameMap, 

215 separate_input_output_schemas: bool = True, 

216 ) -> Tuple[ 

217 Dict[ 

218 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue 

219 ], 

220 Dict[str, Dict[str, Any]], 

221 ]: 

222 override_mode: Union[Literal["validation"], None] = ( 1abcde

223 None if separate_input_output_schemas else "validation" 

224 ) 

225 inputs = [ 1abcde

226 (field, override_mode or field.mode, field._type_adapter.core_schema) 

227 for field in fields 

228 ] 

229 field_mapping, definitions = schema_generator.generate_definitions( 1abcde

230 inputs=inputs 

231 ) 

232 return field_mapping, definitions # type: ignore[return-value] 1abcde

233 

234 def is_scalar_field(field: ModelField) -> bool: 1abcde

235 from fastapi import params 1abcde

236 

237 return field_annotation_is_scalar( 1abcde

238 field.field_info.annotation 

239 ) and not isinstance(field.field_info, params.Body) 

240 

241 def is_sequence_field(field: ModelField) -> bool: 1abcde

242 return field_annotation_is_sequence(field.field_info.annotation) 1abcde

243 

244 def is_scalar_sequence_field(field: ModelField) -> bool: 1abcde

245 return field_annotation_is_scalar_sequence(field.field_info.annotation) 1abcde

246 

247 def is_bytes_field(field: ModelField) -> bool: 1abcde

248 return is_bytes_or_nonable_bytes_annotation(field.type_) 1abcde

249 

250 def is_bytes_sequence_field(field: ModelField) -> bool: 1abcde

251 return is_bytes_sequence_annotation(field.type_) 1abcde

252 

253 def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo: 1abcde

254 cls = type(field_info) 1abcde

255 merged_field_info = cls.from_annotation(annotation) 1abcde

256 new_field_info = copy(field_info) 1abcde

257 new_field_info.metadata = merged_field_info.metadata 1abcde

258 new_field_info.annotation = merged_field_info.annotation 1abcde

259 return new_field_info 1abcde

260 

261 def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]: 1abcde

262 origin_type = ( 1abcde

263 get_origin(field.field_info.annotation) or field.field_info.annotation 

264 ) 

265 assert issubclass(origin_type, sequence_types) # type: ignore[arg-type] 1abcde

266 return sequence_annotation_to_type[origin_type](value) # type: ignore[no-any-return] 1abcde

267 

268 def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: 1abcde

269 error = ValidationError.from_exception_data( 1abcde

270 "Field required", [{"type": "missing", "loc": loc, "input": {}}] 

271 ).errors(include_url=False)[0] 

272 error["input"] = None 1abcde

273 return error # type: ignore[return-value] 1abcde

274 

275 def create_body_model( 1abcde

276 *, fields: Sequence[ModelField], model_name: str 

277 ) -> Type[BaseModel]: 

278 field_params = {f.name: (f.field_info.annotation, f.field_info) for f in fields} 1abcde

279 BodyModel: Type[BaseModel] = create_model(model_name, **field_params) # type: ignore[call-overload] 1abcde

280 return BodyModel 1abcde

281 

282else: 

283 from fastapi.openapi.constants import REF_PREFIX as REF_PREFIX 1abcde

284 from pydantic import AnyUrl as Url # noqa: F401 1abcde

285 from pydantic import ( # type: ignore[assignment] 1abcde

286 BaseConfig as BaseConfig, # noqa: F401 

287 ) 

288 from pydantic import ValidationError as ValidationError # noqa: F401 1abcde

289 from pydantic.class_validators import ( # type: ignore[no-redef] 1abcde

290 Validator as Validator, # noqa: F401 

291 ) 

292 from pydantic.error_wrappers import ( # type: ignore[no-redef] 1abcde

293 ErrorWrapper as ErrorWrapper, # noqa: F401 

294 ) 

295 from pydantic.errors import MissingError 1abcde

296 from pydantic.fields import ( # type: ignore[attr-defined] 1abcde

297 SHAPE_FROZENSET, 

298 SHAPE_LIST, 

299 SHAPE_SEQUENCE, 

300 SHAPE_SET, 

301 SHAPE_SINGLETON, 

302 SHAPE_TUPLE, 

303 SHAPE_TUPLE_ELLIPSIS, 

304 ) 

305 from pydantic.fields import FieldInfo as FieldInfo 1abcde

306 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde

307 ModelField as ModelField, # noqa: F401 

308 ) 

309 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde

310 Required as Required, # noqa: F401 

311 ) 

312 from pydantic.fields import ( # type: ignore[no-redef,attr-defined] 1abcde

313 Undefined as Undefined, 

314 ) 

315 from pydantic.fields import ( # type: ignore[no-redef, attr-defined] 1abcde

316 UndefinedType as UndefinedType, # noqa: F401 

317 ) 

318 from pydantic.schema import ( 1abcde

319 field_schema, 

320 get_flat_models_from_fields, 

321 get_model_name_map, 

322 model_process_schema, 

323 ) 

324 from pydantic.schema import ( # type: ignore[no-redef] # noqa: F401 1abcde

325 get_annotation_from_field_info as get_annotation_from_field_info, 

326 ) 

327 from pydantic.typing import ( # type: ignore[no-redef] 1abcde

328 evaluate_forwardref as evaluate_forwardref, # noqa: F401 

329 ) 

330 from pydantic.utils import ( # type: ignore[no-redef] 1abcde

331 lenient_issubclass as lenient_issubclass, # noqa: F401 

332 ) 

333 

334 GetJsonSchemaHandler = Any # type: ignore[assignment,misc] 1abcde

335 JsonSchemaValue = Dict[str, Any] # type: ignore[misc] 1abcde

336 CoreSchema = Any # type: ignore[assignment,misc] 1abcde

337 

338 sequence_shapes = { 1abcde

339 SHAPE_LIST, 

340 SHAPE_SET, 

341 SHAPE_FROZENSET, 

342 SHAPE_TUPLE, 

343 SHAPE_SEQUENCE, 

344 SHAPE_TUPLE_ELLIPSIS, 

345 } 

346 sequence_shape_to_type = { 1abcde

347 SHAPE_LIST: list, 

348 SHAPE_SET: set, 

349 SHAPE_TUPLE: tuple, 

350 SHAPE_SEQUENCE: list, 

351 SHAPE_TUPLE_ELLIPSIS: list, 

352 } 

353 

354 @dataclass 1abcde

355 class GenerateJsonSchema: # type: ignore[no-redef] 1abcde

356 ref_template: str 1abcde

357 

358 class PydanticSchemaGenerationError(Exception): # type: ignore[no-redef] 1abcde

359 pass 1abcde

360 

361 def with_info_plain_validator_function( # type: ignore[misc] 1abcde

362 function: Callable[..., Any], 

363 *, 

364 ref: Union[str, None] = None, 

365 metadata: Any = None, 

366 serialization: Any = None, 

367 ) -> Any: 

368 return {} 1abcde

369 

370 def get_model_definitions( 1abcde

371 *, 

372 flat_models: Set[Union[Type[BaseModel], Type[Enum]]], 

373 model_name_map: Dict[Union[Type[BaseModel], Type[Enum]], str], 

374 ) -> Dict[str, Any]: 

375 definitions: Dict[str, Dict[str, Any]] = {} 1abcde

376 for model in flat_models: 1abcde

377 m_schema, m_definitions, m_nested_models = model_process_schema( 1abcde

378 model, model_name_map=model_name_map, ref_prefix=REF_PREFIX 

379 ) 

380 definitions.update(m_definitions) 1abcde

381 model_name = model_name_map[model] 1abcde

382 if "description" in m_schema: 1abcde

383 m_schema["description"] = m_schema["description"].split("\f")[0] 1abcde

384 definitions[model_name] = m_schema 1abcde

385 return definitions 1abcde

386 

387 def is_pv1_scalar_field(field: ModelField) -> bool: 1abcde

388 from fastapi import params 1abcde

389 

390 field_info = field.field_info 1abcde

391 if not ( 1ab

392 field.shape == SHAPE_SINGLETON # type: ignore[attr-defined] 

393 and not lenient_issubclass(field.type_, BaseModel) 

394 and not lenient_issubclass(field.type_, dict) 

395 and not field_annotation_is_sequence(field.type_) 

396 and not is_dataclass(field.type_) 

397 and not isinstance(field_info, params.Body) 

398 ): 

399 return False 1abcde

400 if field.sub_fields: # type: ignore[attr-defined] 1abcde

401 if not all( 1abcde

402 is_pv1_scalar_field(f) 

403 for f in field.sub_fields # type: ignore[attr-defined] 

404 ): 

405 return False 1abcde

406 return True 1abcde

407 

408 def is_pv1_scalar_sequence_field(field: ModelField) -> bool: 1abcde

409 if (field.shape in sequence_shapes) and not lenient_issubclass( # type: ignore[attr-defined] 1abcde

410 field.type_, BaseModel 

411 ): 

412 if field.sub_fields is not None: # type: ignore[attr-defined] 1abcde

413 for sub_field in field.sub_fields: # type: ignore[attr-defined] 1abcde

414 if not is_pv1_scalar_field(sub_field): 1abcde

415 return False 1abcde

416 return True 1abcde

417 if _annotation_is_sequence(field.type_): 1abcde

418 return True 1abcde

419 return False 1abcde

420 

421 def _normalize_errors(errors: Sequence[Any]) -> List[Dict[str, Any]]: 1abcde

422 use_errors: List[Any] = [] 1abcde

423 for error in errors: 1abcde

424 if isinstance(error, ErrorWrapper): 1abcde

425 new_errors = ValidationError( # type: ignore[call-arg] 1abcde

426 errors=[error], model=RequestErrorModel 

427 ).errors() 

428 use_errors.extend(new_errors) 1abcde

429 elif isinstance(error, list): 1abcde

430 use_errors.extend(_normalize_errors(error)) 1abcde

431 else: 

432 use_errors.append(error) 1abcde

433 return use_errors 1abcde

434 

435 def _model_rebuild(model: Type[BaseModel]) -> None: 1abcde

436 model.update_forward_refs() 1abcde

437 

438 def _model_dump( 1abcde

439 model: BaseModel, mode: Literal["json", "python"] = "json", **kwargs: Any 

440 ) -> Any: 

441 return model.dict(**kwargs) 1abcde

442 

443 def _get_model_config(model: BaseModel) -> Any: 1abcde

444 return model.__config__ # type: ignore[attr-defined] 1abcde

445 

446 def get_schema_from_model_field( 1abcde

447 *, 

448 field: ModelField, 

449 schema_generator: GenerateJsonSchema, 

450 model_name_map: ModelNameMap, 

451 field_mapping: Dict[ 

452 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue 

453 ], 

454 separate_input_output_schemas: bool = True, 

455 ) -> Dict[str, Any]: 

456 # This expects that GenerateJsonSchema was already used to generate the definitions 

457 return field_schema( # type: ignore[no-any-return] 1abcde

458 field, model_name_map=model_name_map, ref_prefix=REF_PREFIX 

459 )[0] 

460 

461 def get_compat_model_name_map(fields: List[ModelField]) -> ModelNameMap: 1abcde

462 models = get_flat_models_from_fields(fields, known_models=set()) 1abcde

463 return get_model_name_map(models) # type: ignore[no-any-return] 1abcde

464 

465 def get_definitions( 1abcde

466 *, 

467 fields: List[ModelField], 

468 schema_generator: GenerateJsonSchema, 

469 model_name_map: ModelNameMap, 

470 separate_input_output_schemas: bool = True, 

471 ) -> Tuple[ 

472 Dict[ 

473 Tuple[ModelField, Literal["validation", "serialization"]], JsonSchemaValue 

474 ], 

475 Dict[str, Dict[str, Any]], 

476 ]: 

477 models = get_flat_models_from_fields(fields, known_models=set()) 1abcde

478 return {}, get_model_definitions( 1abcde

479 flat_models=models, model_name_map=model_name_map 

480 ) 

481 

482 def is_scalar_field(field: ModelField) -> bool: 1abcde

483 return is_pv1_scalar_field(field) 1abcde

484 

485 def is_sequence_field(field: ModelField) -> bool: 1abcde

486 return field.shape in sequence_shapes or _annotation_is_sequence(field.type_) # type: ignore[attr-defined] 1abcde

487 

488 def is_scalar_sequence_field(field: ModelField) -> bool: 1abcde

489 return is_pv1_scalar_sequence_field(field) 1abcde

490 

491 def is_bytes_field(field: ModelField) -> bool: 1abcde

492 return lenient_issubclass(field.type_, bytes) 1abcde

493 

494 def is_bytes_sequence_field(field: ModelField) -> bool: 1abcde

495 return field.shape in sequence_shapes and lenient_issubclass(field.type_, bytes) # type: ignore[attr-defined] 1abcde

496 

497 def copy_field_info(*, field_info: FieldInfo, annotation: Any) -> FieldInfo: 1abcde

498 return copy(field_info) 1abcde

499 

500 def serialize_sequence_value(*, field: ModelField, value: Any) -> Sequence[Any]: 1abcde

501 return sequence_shape_to_type[field.shape](value) # type: ignore[no-any-return,attr-defined] 1abcde

502 

503 def get_missing_field_error(loc: Tuple[str, ...]) -> Dict[str, Any]: 1abcde

504 missing_field_error = ErrorWrapper(MissingError(), loc=loc) # type: ignore[call-arg] 1abcde

505 new_error = ValidationError([missing_field_error], RequestErrorModel) 1abcde

506 return new_error.errors()[0] # type: ignore[return-value] 1abcde

507 

508 def create_body_model( 1abcde

509 *, fields: Sequence[ModelField], model_name: str 

510 ) -> Type[BaseModel]: 

511 BodyModel = create_model(model_name) 1abcde

512 for f in fields: 1abcde

513 BodyModel.__fields__[f.name] = f # type: ignore[index] 1abcde

514 return BodyModel 1abcde

515 

516 

517def _regenerate_error_with_loc( 1abcde

518 *, errors: Sequence[Any], loc_prefix: Tuple[Union[str, int], ...] 

519) -> List[Dict[str, Any]]: 

520 updated_loc_errors: List[Any] = [ 1abcde

521 {**err, "loc": loc_prefix + err.get("loc", ())} 

522 for err in _normalize_errors(errors) 

523 ] 

524 

525 return updated_loc_errors 1abcde

526 

527 

528def _annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde

529 if lenient_issubclass(annotation, (str, bytes)): 1abcde

530 return False 1abcde

531 return lenient_issubclass(annotation, sequence_types) 1abcde

532 

533 

534def field_annotation_is_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde

535 return _annotation_is_sequence(annotation) or _annotation_is_sequence( 1abcde

536 get_origin(annotation) 

537 ) 

538 

539 

540def value_is_sequence(value: Any) -> bool: 1abcde

541 return isinstance(value, sequence_types) and not isinstance(value, (str, bytes)) # type: ignore[arg-type] 1abcde

542 

543 

544def _annotation_is_complex(annotation: Union[Type[Any], None]) -> bool: 1abcde

545 return ( 1abcde

546 lenient_issubclass(annotation, (BaseModel, Mapping, UploadFile)) 

547 or _annotation_is_sequence(annotation) 

548 or is_dataclass(annotation) 

549 ) 

550 

551 

552def field_annotation_is_complex(annotation: Union[Type[Any], None]) -> bool: 1abcde

553 origin = get_origin(annotation) 1abcde

554 if origin is Union or origin is UnionType: 1abcde

555 return any(field_annotation_is_complex(arg) for arg in get_args(annotation)) 1abcde

556 

557 return ( 1abcde

558 _annotation_is_complex(annotation) 

559 or _annotation_is_complex(origin) 

560 or hasattr(origin, "__pydantic_core_schema__") 

561 or hasattr(origin, "__get_pydantic_core_schema__") 

562 ) 

563 

564 

565def field_annotation_is_scalar(annotation: Any) -> bool: 1abcde

566 # handle Ellipsis here to make tuple[int, ...] work nicely 

567 return annotation is Ellipsis or not field_annotation_is_complex(annotation) 1abcde

568 

569 

570def field_annotation_is_scalar_sequence(annotation: Union[Type[Any], None]) -> bool: 1abcde

571 origin = get_origin(annotation) 1abcde

572 if origin is Union or origin is UnionType: 1abcde

573 at_least_one_scalar_sequence = False 1abcde

574 for arg in get_args(annotation): 1abcde

575 if field_annotation_is_scalar_sequence(arg): 1abcde

576 at_least_one_scalar_sequence = True 1abcde

577 continue 1abcde

578 elif not field_annotation_is_scalar(arg): 1abcde

579 return False 1abcde

580 return at_least_one_scalar_sequence 1abcde

581 return field_annotation_is_sequence(annotation) and all( 1abcde

582 field_annotation_is_scalar(sub_annotation) 

583 for sub_annotation in get_args(annotation) 

584 ) 

585 

586 

587def is_bytes_or_nonable_bytes_annotation(annotation: Any) -> bool: 1abcde

588 if lenient_issubclass(annotation, bytes): 1abcde

589 return True 1abcde

590 origin = get_origin(annotation) 1abcde

591 if origin is Union or origin is UnionType: 1abcde

592 for arg in get_args(annotation): 1abcde

593 if lenient_issubclass(arg, bytes): 1abcde

594 return True 1abcde

595 return False 1abcde

596 

597 

598def is_uploadfile_or_nonable_uploadfile_annotation(annotation: Any) -> bool: 1abcde

599 if lenient_issubclass(annotation, UploadFile): 1abcde

600 return True 1abcde

601 origin = get_origin(annotation) 1abcde

602 if origin is Union or origin is UnionType: 1abcde

603 for arg in get_args(annotation): 1abcde

604 if lenient_issubclass(arg, UploadFile): 1abcde

605 return True 1abcde

606 return False 1abcde

607 

608 

609def is_bytes_sequence_annotation(annotation: Any) -> bool: 1abcde

610 origin = get_origin(annotation) 1abcde

611 if origin is Union or origin is UnionType: 1abcde

612 at_least_one = False 1abcde

613 for arg in get_args(annotation): 1abcde

614 if is_bytes_sequence_annotation(arg): 1abcde

615 at_least_one = True 1abcde

616 continue 1abcde

617 return at_least_one 1abcde

618 return field_annotation_is_sequence(annotation) and all( 1abcde

619 is_bytes_or_nonable_bytes_annotation(sub_annotation) 

620 for sub_annotation in get_args(annotation) 

621 ) 

622 

623 

624def is_uploadfile_sequence_annotation(annotation: Any) -> bool: 1abcde

625 origin = get_origin(annotation) 1abcde

626 if origin is Union or origin is UnionType: 1abcde

627 at_least_one = False 1abcde

628 for arg in get_args(annotation): 1abcde

629 if is_uploadfile_sequence_annotation(arg): 1abcde

630 at_least_one = True 1abcde

631 continue 1abcde

632 return at_least_one 1abcde

633 return field_annotation_is_sequence(annotation) and all( 1abcde

634 is_uploadfile_or_nonable_uploadfile_annotation(sub_annotation) 

635 for sub_annotation in get_args(annotation) 

636 )