Coverage for pydantic/_internal/_generate_schema.py: 95.19%
1351 statements
« prev ^ index » next coverage.py v7.8.1, created at 2025-05-22 20:36 +0000
« prev ^ index » next coverage.py v7.8.1, created at 2025-05-22 20:36 +0000
1"""Convert python types to pydantic-core schema."""
3from __future__ import annotations as _annotations 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
5import collections.abc 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
6import dataclasses 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
7import datetime 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
8import inspect 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
9import os 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
10import pathlib 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
11import re 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
12import sys 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
13import typing 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
14import warnings 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
15from collections.abc import Generator, Iterable, Iterator, Mapping 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
16from contextlib import contextmanager 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
17from copy import copy 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
18from decimal import Decimal 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
19from enum import Enum 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
20from fractions import Fraction 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
21from functools import partial 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
22from inspect import Parameter, _ParameterKind, signature 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
23from ipaddress import IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
24from itertools import chain 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
25from operator import attrgetter 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
26from types import FunctionType, GenericAlias, LambdaType, MethodType 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
27from typing import ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
28 TYPE_CHECKING,
29 Any,
30 Callable,
31 Final,
32 ForwardRef,
33 Literal,
34 TypeVar,
35 Union,
36 cast,
37 overload,
38)
39from uuid import UUID 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
40from warnings import warn 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
41from zoneinfo import ZoneInfo 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
43import typing_extensions 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
44from pydantic_core import ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
45 CoreSchema,
46 MultiHostUrl,
47 PydanticCustomError,
48 PydanticSerializationUnexpectedValue,
49 PydanticUndefined,
50 Url,
51 core_schema,
52 to_jsonable_python,
53)
54from typing_extensions import TypeAlias, TypeAliasType, TypedDict, get_args, get_origin, is_typeddict 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
55from typing_inspection import typing_objects 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
56from typing_inspection.introspection import AnnotationSource, get_literal_values, is_union_origin 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
58from ..aliases import AliasChoices, AliasGenerator, AliasPath 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
59from ..annotated_handlers import GetCoreSchemaHandler, GetJsonSchemaHandler 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
60from ..config import ConfigDict, JsonDict, JsonEncoder, JsonSchemaExtraCallable 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
61from ..errors import PydanticSchemaGenerationError, PydanticUndefinedAnnotation, PydanticUserError 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
62from ..functional_validators import AfterValidator, BeforeValidator, FieldValidatorModes, PlainValidator, WrapValidator 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
63from ..json_schema import JsonSchemaValue 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
64from ..version import version_short 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
65from ..warnings import PydanticDeprecatedSince20 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
66from . import _decorators, _discriminated_union, _known_annotated_metadata, _repr, _typing_extra 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
67from ._config import ConfigWrapper, ConfigWrapperStack 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
68from ._core_metadata import CoreMetadata, update_core_metadata 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
69from ._core_utils import ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
70 get_ref,
71 get_type_ref,
72 is_list_like_schema_with_items_schema,
73 validate_core_schema,
74)
75from ._decorators import ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
76 Decorator,
77 DecoratorInfos,
78 FieldSerializerDecoratorInfo,
79 FieldValidatorDecoratorInfo,
80 ModelSerializerDecoratorInfo,
81 ModelValidatorDecoratorInfo,
82 RootValidatorDecoratorInfo,
83 ValidatorDecoratorInfo,
84 get_attribute_from_bases,
85 inspect_field_serializer,
86 inspect_model_serializer,
87 inspect_validator,
88)
89from ._docs_extraction import extract_docstrings_from_cls 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
90from ._fields import collect_dataclass_fields, rebuild_model_fields, takes_validated_data_argument 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
91from ._forward_ref import PydanticRecursiveRef 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
92from ._generics import get_standard_typevars_map, replace_types 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
93from ._import_utils import import_cached_base_model, import_cached_field_info 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
94from ._mock_val_ser import MockCoreSchema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
95from ._namespace_utils import NamespacesTuple, NsResolver 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
96from ._schema_gather import MissingDefinitionError, gather_schemas_for_cleaning 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
97from ._schema_generation_shared import CallbackGetCoreSchemaHandler 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
98from ._utils import lenient_issubclass, smart_deepcopy 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
100if TYPE_CHECKING: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
101 from ..fields import ComputedFieldInfo, FieldInfo
102 from ..main import BaseModel
103 from ..types import Discriminator
104 from ._dataclasses import StandardDataclass
105 from ._schema_generation_shared import GetJsonSchemaFunction
107_SUPPORTS_TYPEDDICT = sys.version_info >= (3, 12) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
109FieldDecoratorInfo = Union[ValidatorDecoratorInfo, FieldValidatorDecoratorInfo, FieldSerializerDecoratorInfo] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
110FieldDecoratorInfoType = TypeVar('FieldDecoratorInfoType', bound=FieldDecoratorInfo) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
111AnyFieldDecorator = Union[ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
112 Decorator[ValidatorDecoratorInfo],
113 Decorator[FieldValidatorDecoratorInfo],
114 Decorator[FieldSerializerDecoratorInfo],
115]
117ModifyCoreSchemaWrapHandler: TypeAlias = GetCoreSchemaHandler 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
118GetCoreSchemaFunction: TypeAlias = Callable[[Any, ModifyCoreSchemaWrapHandler], core_schema.CoreSchema] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
119ParametersCallback: TypeAlias = "Callable[[int, str, Any], Literal['skip'] | None]" 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
121TUPLE_TYPES: list[type] = [typing.Tuple, tuple] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
122LIST_TYPES: list[type] = [typing.List, list, collections.abc.MutableSequence] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
123SET_TYPES: list[type] = [typing.Set, set, collections.abc.MutableSet] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
124FROZEN_SET_TYPES: list[type] = [typing.FrozenSet, frozenset, collections.abc.Set] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
125DICT_TYPES: list[type] = [typing.Dict, dict] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
126IP_TYPES: list[type] = [IPv4Address, IPv4Interface, IPv4Network, IPv6Address, IPv6Interface, IPv6Network] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
127SEQUENCE_TYPES: list[type] = [typing.Sequence, collections.abc.Sequence] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
128ITERABLE_TYPES: list[type] = [typing.Iterable, collections.abc.Iterable, typing.Generator, collections.abc.Generator] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
129TYPE_TYPES: list[type] = [typing.Type, type] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
130PATTERN_TYPES: list[type] = [typing.Pattern, re.Pattern] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
131PATH_TYPES: list[type] = [ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
132 os.PathLike,
133 pathlib.Path,
134 pathlib.PurePath,
135 pathlib.PosixPath,
136 pathlib.PurePosixPath,
137 pathlib.PureWindowsPath,
138]
139MAPPING_TYPES = [ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
140 typing.Mapping,
141 typing.MutableMapping,
142 collections.abc.Mapping,
143 collections.abc.MutableMapping,
144 collections.OrderedDict,
145 typing_extensions.OrderedDict,
146 typing.DefaultDict, # noqa: UP006
147 collections.defaultdict,
148]
149COUNTER_TYPES = [collections.Counter, typing.Counter] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
150DEQUE_TYPES: list[type] = [collections.deque, typing.Deque] # noqa: UP006 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
152# Note: This does not play very well with type checkers. For example,
153# `a: LambdaType = lambda x: x` will raise a type error by Pyright.
154ValidateCallSupportedTypes = Union[ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
155 LambdaType,
156 FunctionType,
157 MethodType,
158 partial,
159]
161VALIDATE_CALL_SUPPORTED_TYPES = get_args(ValidateCallSupportedTypes) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
163_mode_to_validator: dict[ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
164 FieldValidatorModes, type[BeforeValidator | AfterValidator | PlainValidator | WrapValidator]
165] = {'before': BeforeValidator, 'after': AfterValidator, 'plain': PlainValidator, 'wrap': WrapValidator}
168def check_validator_fields_against_field_name( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
169 info: FieldDecoratorInfo,
170 field: str,
171) -> bool:
172 """Check if field name is in validator fields.
174 Args:
175 info: The field info.
176 field: The field name to check.
178 Returns:
179 `True` if field name is in validator fields, `False` otherwise.
180 """
181 fields = info.fields 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
182 return '*' in fields or field in fields 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
185def check_decorator_fields_exist(decorators: Iterable[AnyFieldDecorator], fields: Iterable[str]) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
186 """Check if the defined fields in decorators exist in `fields` param.
188 It ignores the check for a decorator if the decorator has `*` as field or `check_fields=False`.
190 Args:
191 decorators: An iterable of decorators.
192 fields: An iterable of fields name.
194 Raises:
195 PydanticUserError: If one of the field names does not exist in `fields` param.
196 """
197 fields = set(fields) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
198 for dec in decorators: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
199 if '*' in dec.info.fields: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
200 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
201 if dec.info.check_fields is False: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
202 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
203 for field in dec.info.fields: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
204 if field not in fields: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
205 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
206 f'Decorators defined with incorrect fields: {dec.cls_ref}.{dec.cls_var_name}'
207 " (use check_fields=False if you're inheriting from the model and intended this)",
208 code='decorator-missing-field',
209 )
212def filter_field_decorator_info_by_field( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
213 validator_functions: Iterable[Decorator[FieldDecoratorInfoType]], field: str
214) -> list[Decorator[FieldDecoratorInfoType]]:
215 return [dec for dec in validator_functions if check_validator_fields_against_field_name(dec.info, field)] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
218def apply_each_item_validators( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
219 schema: core_schema.CoreSchema,
220 each_item_validators: list[Decorator[ValidatorDecoratorInfo]],
221 field_name: str | None,
222) -> core_schema.CoreSchema:
223 # This V1 compatibility shim should eventually be removed
225 # fail early if each_item_validators is empty
226 if not each_item_validators: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
227 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
229 # push down any `each_item=True` validators
230 # note that this won't work for any Annotated types that get wrapped by a function validator
231 # but that's okay because that didn't exist in V1
232 if schema['type'] == 'nullable': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
233 schema['schema'] = apply_each_item_validators(schema['schema'], each_item_validators, field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
234 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
235 elif schema['type'] == 'tuple': 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
236 if (variadic_item_index := schema.get('variadic_item_index')) is not None:
237 schema['items_schema'][variadic_item_index] = apply_validators(
238 schema['items_schema'][variadic_item_index],
239 each_item_validators,
240 field_name,
241 )
242 elif is_list_like_schema_with_items_schema(schema): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
243 inner_schema = schema.get('items_schema', core_schema.any_schema()) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
244 schema['items_schema'] = apply_validators(inner_schema, each_item_validators, field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
245 elif schema['type'] == 'dict': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
246 inner_schema = schema.get('values_schema', core_schema.any_schema()) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
247 schema['values_schema'] = apply_validators(inner_schema, each_item_validators, field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
248 else:
249 raise TypeError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
250 f'`@validator(..., each_item=True)` cannot be applied to fields with a schema of {schema["type"]}'
251 )
252 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
255def _extract_json_schema_info_from_field_info( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
256 info: FieldInfo | ComputedFieldInfo,
257) -> tuple[JsonDict | None, JsonDict | JsonSchemaExtraCallable | None]:
258 json_schema_updates = { 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
259 'title': info.title,
260 'description': info.description,
261 'deprecated': bool(info.deprecated) or info.deprecated == '' or None,
262 'examples': to_jsonable_python(info.examples),
263 }
264 json_schema_updates = {k: v for k, v in json_schema_updates.items() if v is not None} 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
265 return (json_schema_updates or None, info.json_schema_extra) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
268JsonEncoders = dict[type[Any], JsonEncoder] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
271def _add_custom_serialization_from_json_encoders( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
272 json_encoders: JsonEncoders | None, tp: Any, schema: CoreSchema
273) -> CoreSchema:
274 """Iterate over the json_encoders and add the first matching encoder to the schema.
276 Args:
277 json_encoders: A dictionary of types and their encoder functions.
278 tp: The type to check for a matching encoder.
279 schema: The schema to add the encoder to.
280 """
281 if not json_encoders: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
282 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
283 if 'serialization' in schema: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
284 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
285 # Check the class type and its superclasses for a matching encoder
286 # Decimal.__class__.__mro__ (and probably other cases) doesn't include Decimal itself
287 # if the type is a GenericAlias (e.g. from list[int]) we need to use __class__ instead of .__mro__
288 for base in (tp, *getattr(tp, '__mro__', tp.__class__.__mro__)[:-1]): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
289 encoder = json_encoders.get(base) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
290 if encoder is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
291 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
293 warnings.warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
294 f'`json_encoders` is deprecated. See https://docs.pydantic.dev/{version_short()}/concepts/serialization/#custom-serializers for alternatives',
295 PydanticDeprecatedSince20,
296 )
298 # TODO: in theory we should check that the schema accepts a serialization key
299 schema['serialization'] = core_schema.plain_serializer_function_ser_schema(encoder, when_used='json') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
300 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
302 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
305def _get_first_non_null(a: Any, b: Any) -> Any: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
306 """Return the first argument if it is not None, otherwise return the second argument.
308 Use case: serialization_alias (argument a) and alias (argument b) are both defined, and serialization_alias is ''.
309 This function will return serialization_alias, which is the first argument, even though it is an empty string.
310 """
311 return a if a is not None else b 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
314class InvalidSchemaError(Exception): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
315 """The core schema is invalid."""
318class GenerateSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
319 """Generate core schema for a Pydantic model, dataclass and types like `str`, `datetime`, ... ."""
321 __slots__ = ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
322 '_config_wrapper_stack',
323 '_ns_resolver',
324 '_typevars_map',
325 'field_name_stack',
326 'model_type_stack',
327 'defs',
328 )
330 def __init__( 1abdjoquwxCDghlmrsyzEFGOJKLMNPcifnptvABHI
331 self,
332 config_wrapper: ConfigWrapper,
333 ns_resolver: NsResolver | None = None,
334 typevars_map: Mapping[TypeVar, Any] | None = None,
335 ) -> None:
336 # we need a stack for recursing into nested models
337 self._config_wrapper_stack = ConfigWrapperStack(config_wrapper) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
338 self._ns_resolver = ns_resolver or NsResolver() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
339 self._typevars_map = typevars_map 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
340 self.field_name_stack = _FieldNameStack() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
341 self.model_type_stack = _ModelTypeStack() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
342 self.defs = _Definitions() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
344 def __init_subclass__(cls) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
345 super().__init_subclass__() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
346 warnings.warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
347 'Subclassing `GenerateSchema` is not supported. The API is highly subject to change in minor versions.',
348 UserWarning,
349 stacklevel=2,
350 )
352 @property 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
353 def _config_wrapper(self) -> ConfigWrapper: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
354 return self._config_wrapper_stack.tail 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
356 @property 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
357 def _types_namespace(self) -> NamespacesTuple: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
358 return self._ns_resolver.types_namespace 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
360 @property 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
361 def _arbitrary_types(self) -> bool: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
362 return self._config_wrapper.arbitrary_types_allowed 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
364 # the following methods can be overridden but should be considered
365 # unstable / private APIs
366 def _list_schema(self, items_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
367 return core_schema.list_schema(self.generate_schema(items_type)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
369 def _dict_schema(self, keys_type: Any, values_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
370 return core_schema.dict_schema(self.generate_schema(keys_type), self.generate_schema(values_type)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
372 def _set_schema(self, items_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
373 return core_schema.set_schema(self.generate_schema(items_type)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
375 def _frozenset_schema(self, items_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
376 return core_schema.frozenset_schema(self.generate_schema(items_type)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
378 def _enum_schema(self, enum_type: type[Enum]) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
379 cases: list[Any] = list(enum_type.__members__.values()) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
381 enum_ref = get_type_ref(enum_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
382 description = None if not enum_type.__doc__ else inspect.cleandoc(enum_type.__doc__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
383 if ( 1abke
384 description == 'An enumeration.'
385 ): # This is the default value provided by enum.EnumMeta.__new__; don't use it
386 description = None 1abdjkeghlmcifn
387 js_updates = {'title': enum_type.__name__, 'description': description} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
388 js_updates = {k: v for k, v in js_updates.items() if v is not None} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
390 sub_type: Literal['str', 'int', 'float'] | None = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
391 if issubclass(enum_type, int): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
392 sub_type = 'int' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
393 value_ser_type: core_schema.SerSchema = core_schema.simple_ser_schema('int') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
394 elif issubclass(enum_type, str): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
395 # this handles `StrEnum` (3.11 only), and also `Foobar(str, Enum)`
396 sub_type = 'str' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
397 value_ser_type = core_schema.simple_ser_schema('str') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
398 elif issubclass(enum_type, float): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
399 sub_type = 'float' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
400 value_ser_type = core_schema.simple_ser_schema('float') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
401 else:
402 # TODO this is an ugly hack, how do we trigger an Any schema for serialization?
403 value_ser_type = core_schema.plain_serializer_function_ser_schema(lambda x: x) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
405 if cases: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
407 def get_json_schema(schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
408 json_schema = handler(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
409 original_schema = handler.resolve_ref_schema(json_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
410 original_schema.update(js_updates) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
411 return json_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
413 # we don't want to add the missing to the schema if it's the default one
414 default_missing = getattr(enum_type._missing_, '__func__', None) is Enum._missing_.__func__ # pyright: ignore[reportFunctionMemberAccess] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
415 enum_schema = core_schema.enum_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
416 enum_type,
417 cases,
418 sub_type=sub_type,
419 missing=None if default_missing else enum_type._missing_,
420 ref=enum_ref,
421 metadata={'pydantic_js_functions': [get_json_schema]},
422 )
424 if self._config_wrapper.use_enum_values: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
425 enum_schema = core_schema.no_info_after_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
426 attrgetter('value'), enum_schema, serialization=value_ser_type
427 )
429 return enum_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
431 else:
433 def get_json_schema_no_cases(_, handler: GetJsonSchemaHandler) -> JsonSchemaValue: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
434 json_schema = handler(core_schema.enum_schema(enum_type, cases, sub_type=sub_type, ref=enum_ref)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
435 original_schema = handler.resolve_ref_schema(json_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
436 original_schema.update(js_updates) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
437 return json_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
439 # Use an isinstance check for enums with no cases.
440 # The most important use case for this is creating TypeVar bounds for generics that should
441 # be restricted to enums. This is more consistent than it might seem at first, since you can only
442 # subclass enum.Enum (or subclasses of enum.Enum) if all parent classes have no cases.
443 # We use the get_json_schema function when an Enum subclass has been declared with no cases
444 # so that we can still generate a valid json schema.
445 return core_schema.is_instance_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
446 enum_type,
447 metadata={'pydantic_js_functions': [get_json_schema_no_cases]},
448 )
450 def _ip_schema(self, tp: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
451 from ._validators import IP_VALIDATOR_LOOKUP, IpType 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
453 ip_type_json_schema_format: dict[type[IpType], str] = { 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
454 IPv4Address: 'ipv4',
455 IPv4Network: 'ipv4network',
456 IPv4Interface: 'ipv4interface',
457 IPv6Address: 'ipv6',
458 IPv6Network: 'ipv6network',
459 IPv6Interface: 'ipv6interface',
460 }
462 def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
463 if not isinstance(ip, (tp, str)): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
464 raise PydanticSerializationUnexpectedValue( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
465 f"Expected `{tp}` but got `{type(ip)}` with value `'{ip}'` - serialized value may not be as expected."
466 )
467 if info.mode == 'python': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
468 return ip 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
469 return str(ip) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
471 return core_schema.lax_or_strict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
472 lax_schema=core_schema.no_info_plain_validator_function(IP_VALIDATOR_LOOKUP[tp]),
473 strict_schema=core_schema.json_or_python_schema(
474 json_schema=core_schema.no_info_after_validator_function(tp, core_schema.str_schema()),
475 python_schema=core_schema.is_instance_schema(tp),
476 ),
477 serialization=core_schema.plain_serializer_function_ser_schema(ser_ip, info_arg=True, when_used='always'),
478 metadata={
479 'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': ip_type_json_schema_format[tp]}]
480 },
481 )
483 def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
484 if tp is os.PathLike and (path_type not in {str, bytes} and not typing_objects.is_any(path_type)): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
485 raise PydanticUserError(
486 '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type'
487 )
489 path_constructor = pathlib.PurePath if tp is os.PathLike else tp 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
490 strict_inner_schema = ( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
491 core_schema.bytes_schema(strict=True) if (path_type is bytes) else core_schema.str_schema(strict=True)
492 )
493 lax_inner_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
495 def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
496 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
497 if path_type is bytes: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
498 if isinstance(input_value, bytes): 498 ↛ 504line 498 didn't jump to line 504 because the condition on line 498 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
499 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
500 input_value = input_value.decode() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
501 except UnicodeDecodeError as e:
502 raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e
503 else:
504 raise PydanticCustomError('bytes_type', 'Input must be bytes')
505 elif not isinstance(input_value, str): 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
506 raise PydanticCustomError('path_type', 'Input is not a valid path')
508 return path_constructor(input_value) # type: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
509 except TypeError as e:
510 raise PydanticCustomError('path_type', 'Input is not a valid path') from e
512 def ser_path(path: Any, info: core_schema.SerializationInfo) -> str | os.PathLike[Any]: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
513 if not isinstance(path, (tp, str)): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
514 raise PydanticSerializationUnexpectedValue( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
515 f"Expected `{tp}` but got `{type(path)}` with value `'{path}'` - serialized value may not be as expected."
516 )
517 if info.mode == 'python': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
518 return path 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
519 return str(path) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
521 instance_schema = core_schema.json_or_python_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
522 json_schema=core_schema.no_info_after_validator_function(path_validator, lax_inner_schema),
523 python_schema=core_schema.is_instance_schema(tp),
524 )
526 schema = core_schema.lax_or_strict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
527 lax_schema=core_schema.union_schema(
528 [
529 instance_schema,
530 core_schema.no_info_after_validator_function(path_validator, strict_inner_schema),
531 ],
532 custom_error_type='path_type',
533 custom_error_message=f'Input is not a valid path for {tp}',
534 ),
535 strict_schema=instance_schema,
536 serialization=core_schema.plain_serializer_function_ser_schema(ser_path, info_arg=True, when_used='always'),
537 metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]},
538 )
539 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
541 def _deque_schema(self, items_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
542 from ._serializers import serialize_sequence_via_list 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
543 from ._validators import deque_validator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
545 item_type_schema = self.generate_schema(items_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
547 # we have to use a lax list schema here, because we need to validate the deque's
548 # items via a list schema, but it's ok if the deque itself is not a list
549 list_schema = core_schema.list_schema(item_type_schema, strict=False) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
551 check_instance = core_schema.json_or_python_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
552 json_schema=list_schema,
553 python_schema=core_schema.is_instance_schema(collections.deque, cls_repr='Deque'),
554 )
556 lax_schema = core_schema.no_info_wrap_validator_function(deque_validator, list_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
558 return core_schema.lax_or_strict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
559 lax_schema=lax_schema,
560 strict_schema=core_schema.chain_schema([check_instance, lax_schema]),
561 serialization=core_schema.wrap_serializer_function_ser_schema(
562 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
563 ),
564 )
566 def _mapping_schema(self, tp: Any, keys_type: Any, values_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
567 from ._validators import MAPPING_ORIGIN_MAP, defaultdict_validator, get_defaultdict_default_default_factory 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
569 mapped_origin = MAPPING_ORIGIN_MAP[tp] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
570 keys_schema = self.generate_schema(keys_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
571 values_schema = self.generate_schema(values_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
572 dict_schema = core_schema.dict_schema(keys_schema, values_schema, strict=False) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
574 if mapped_origin is dict: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
575 schema = dict_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
576 else:
577 check_instance = core_schema.json_or_python_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
578 json_schema=dict_schema,
579 python_schema=core_schema.is_instance_schema(mapped_origin),
580 )
582 if tp is collections.defaultdict: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
583 default_default_factory = get_defaultdict_default_default_factory(values_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
584 coerce_instance_wrap = partial( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
585 core_schema.no_info_wrap_validator_function,
586 partial(defaultdict_validator, default_default_factory=default_default_factory),
587 )
588 else:
589 coerce_instance_wrap = partial(core_schema.no_info_after_validator_function, mapped_origin) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
591 lax_schema = coerce_instance_wrap(dict_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
592 strict_schema = core_schema.chain_schema([check_instance, lax_schema]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
594 schema = core_schema.lax_or_strict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
595 lax_schema=lax_schema,
596 strict_schema=strict_schema,
597 serialization=core_schema.wrap_serializer_function_ser_schema(
598 lambda v, h: h(v), schema=dict_schema, info_arg=False
599 ),
600 )
602 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
604 def _fraction_schema(self) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
605 """Support for [`fractions.Fraction`][fractions.Fraction]."""
606 from ._validators import fraction_validator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
608 # TODO: note, this is a fairly common pattern, re lax / strict for attempted type coercion,
609 # can we use a helper function to reduce boilerplate?
610 return core_schema.lax_or_strict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
611 lax_schema=core_schema.no_info_plain_validator_function(fraction_validator),
612 strict_schema=core_schema.json_or_python_schema(
613 json_schema=core_schema.no_info_plain_validator_function(fraction_validator),
614 python_schema=core_schema.is_instance_schema(Fraction),
615 ),
616 # use str serialization to guarantee round trip behavior
617 serialization=core_schema.to_string_ser_schema(when_used='always'),
618 metadata={'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'fraction'}]},
619 )
621 def _arbitrary_type_schema(self, tp: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
622 if not isinstance(tp, type): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
623 warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
624 f'{tp!r} is not a Python type (it may be an instance of an object),'
625 ' Pydantic will allow any object with no validation since we cannot even'
626 ' enforce that the input is an instance of the given type.'
627 ' To get rid of this error wrap the type with `pydantic.SkipValidation`.',
628 UserWarning,
629 )
630 return core_schema.any_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
631 return core_schema.is_instance_schema(tp) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
633 def _unknown_type_schema(self, obj: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
634 raise PydanticSchemaGenerationError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
635 f'Unable to generate pydantic-core schema for {obj!r}. '
636 'Set `arbitrary_types_allowed=True` in the model_config to ignore this error'
637 ' or implement `__get_pydantic_core_schema__` on your type to fully support it.'
638 '\n\nIf you got this error by calling handler(<some type>) within'
639 ' `__get_pydantic_core_schema__` then you likely need to call'
640 ' `handler.generate_schema(<some type>)` since we do not call'
641 ' `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.'
642 )
644 def _apply_discriminator_to_union( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
645 self, schema: CoreSchema, discriminator: str | Discriminator | None
646 ) -> CoreSchema:
647 if discriminator is None: 647 ↛ 648line 647 didn't jump to line 648 because the condition on line 647 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
648 return schema
649 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
650 return _discriminated_union.apply_discriminator( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
651 schema,
652 discriminator,
653 self.defs._definitions,
654 )
655 except _discriminated_union.MissingDefinitionForUnionRef: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
656 # defer until defs are resolved
657 _discriminated_union.set_discriminator_in_metadata( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
658 schema,
659 discriminator,
660 )
661 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
663 def clean_schema(self, schema: CoreSchema) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
664 schema = self.defs.finalize_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
665 schema = validate_core_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
666 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
668 def _add_js_function(self, metadata_schema: CoreSchema, js_function: Callable[..., Any]) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
669 metadata = metadata_schema.get('metadata', {}) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
670 pydantic_js_functions = metadata.setdefault('pydantic_js_functions', []) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
671 # because of how we generate core schemas for nested generic models
672 # we can end up adding `BaseModel.__get_pydantic_json_schema__` multiple times
673 # this check may fail to catch duplicates if the function is a `functools.partial`
674 # or something like that, but if it does it'll fail by inserting the duplicate
675 if js_function not in pydantic_js_functions: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
676 pydantic_js_functions.append(js_function) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
677 metadata_schema['metadata'] = metadata 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
679 def generate_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
680 self,
681 obj: Any,
682 ) -> core_schema.CoreSchema:
683 """Generate core schema.
685 Args:
686 obj: The object to generate core schema for.
688 Returns:
689 The generated core schema.
691 Raises:
692 PydanticUndefinedAnnotation:
693 If it is not possible to evaluate forward reference.
694 PydanticSchemaGenerationError:
695 If it is not possible to generate pydantic-core schema.
696 TypeError:
697 - If `alias_generator` returns a disallowed type (must be str, AliasPath or AliasChoices).
698 - If V1 style validator with `each_item=True` applied on a wrong field.
699 PydanticUserError:
700 - If `typing.TypedDict` is used instead of `typing_extensions.TypedDict` on Python < 3.12.
701 - If `__modify_schema__` method is used instead of `__get_pydantic_json_schema__`.
702 """
703 schema = self._generate_schema_from_get_schema_method(obj, obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
705 if schema is None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
706 schema = self._generate_schema_inner(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
708 metadata_js_function = _extract_get_pydantic_json_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
709 if metadata_js_function is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
710 metadata_schema = resolve_original_schema(schema, self.defs) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
711 if metadata_schema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
712 self._add_js_function(metadata_schema, metadata_js_function) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
714 schema = _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, obj, schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
716 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
718 def _model_schema(self, cls: type[BaseModel]) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
719 """Generate schema for a Pydantic model."""
720 BaseModel_ = import_cached_base_model() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
722 with self.defs.get_schema_or_ref(cls) as (model_ref, maybe_schema): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
723 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
724 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
726 schema = cls.__dict__.get('__pydantic_core_schema__') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
727 if schema is not None and not isinstance(schema, MockCoreSchema): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
728 if schema['type'] == 'definitions': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
729 schema = self.defs.unpack_definitions(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
730 ref = get_ref(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
731 if ref: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
732 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
733 else:
734 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
736 config_wrapper = ConfigWrapper(cls.model_config, check=False) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
738 with self._config_wrapper_stack.push(config_wrapper), self._ns_resolver.push(cls): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
739 core_config = self._config_wrapper.core_config(title=cls.__name__) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
741 if cls.__pydantic_fields_complete__ or cls is BaseModel_: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
742 fields = getattr(cls, '__pydantic_fields__', {}) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
743 else:
744 if not hasattr(cls, '__pydantic_fields__'): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
745 # This happens when we have a loop in the schema generation:
746 # class Base[T](BaseModel):
747 # t: T
748 #
749 # class Other(BaseModel):
750 # b: 'Base[Other]'
751 # When we build fields for `Other`, we evaluate the forward annotation.
752 # At this point, `Other` doesn't have the model fields set. We create
753 # `Base[Other]`; model fields are successfully built, and we try to generate
754 # a schema for `t: Other`. As `Other.__pydantic_fields__` aren't set, we abort.
755 raise PydanticUndefinedAnnotation( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
756 name=cls.__name__,
757 message=f'Class {cls.__name__!r} is not defined',
758 )
759 try: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
760 fields = rebuild_model_fields( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
761 cls,
762 ns_resolver=self._ns_resolver,
763 typevars_map=self._typevars_map or {},
764 )
765 except NameError as e: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
766 raise PydanticUndefinedAnnotation.from_name_error(e) from e 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
768 decorators = cls.__pydantic_decorators__ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
769 computed_fields = decorators.computed_fields 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
770 check_decorator_fields_exist( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
771 chain(
772 decorators.field_validators.values(),
773 decorators.field_serializers.values(),
774 decorators.validators.values(),
775 ),
776 {*fields.keys(), *computed_fields.keys()},
777 )
779 model_validators = decorators.model_validators.values() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
781 extras_schema = None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
782 extras_keys_schema = None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
783 if core_config.get('extra_fields_behavior') == 'allow': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
784 assert cls.__mro__[0] is cls 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
785 assert cls.__mro__[-1] is object 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
786 for candidate_cls in cls.__mro__[:-1]: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
787 extras_annotation = getattr(candidate_cls, '__annotations__', {}).get( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
788 '__pydantic_extra__', None
789 )
790 if extras_annotation is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
791 if isinstance(extras_annotation, str): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
792 extras_annotation = _typing_extra.eval_type_backport( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
793 _typing_extra._make_forward_ref(
794 extras_annotation, is_argument=False, is_class=True
795 ),
796 *self._types_namespace,
797 )
798 tp = get_origin(extras_annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
799 if tp not in DICT_TYPES: 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
800 raise PydanticSchemaGenerationError(
801 'The type annotation for `__pydantic_extra__` must be `dict[str, ...]`'
802 )
803 extra_keys_type, extra_items_type = self._get_args_resolving_forward_refs( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
804 extras_annotation,
805 required=True,
806 )
807 if extra_keys_type is not str: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
808 extras_keys_schema = self.generate_schema(extra_keys_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
809 if not typing_objects.is_any(extra_items_type): 809 ↛ 811line 809 didn't jump to line 811 because the condition on line 809 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
810 extras_schema = self.generate_schema(extra_items_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
811 if extras_keys_schema is not None or extras_schema is not None: 811 ↛ 786line 811 didn't jump to line 786 because the condition on line 811 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
812 break 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
814 generic_origin: type[BaseModel] | None = getattr(cls, '__pydantic_generic_metadata__', {}).get('origin') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
816 if cls.__pydantic_root_model__: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
817 root_field = self._common_field_schema('root', fields['root'], decorators) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
818 inner_schema = root_field['schema'] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
819 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
820 model_schema = core_schema.model_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
821 cls,
822 inner_schema,
823 generic_origin=generic_origin,
824 custom_init=getattr(cls, '__pydantic_custom_init__', None),
825 root_model=True,
826 post_init=getattr(cls, '__pydantic_post_init__', None),
827 config=core_config,
828 ref=model_ref,
829 )
830 else:
831 fields_schema: core_schema.CoreSchema = core_schema.model_fields_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
832 {k: self._generate_md_field_schema(k, v, decorators) for k, v in fields.items()},
833 computed_fields=[
834 self._computed_field_schema(d, decorators.field_serializers)
835 for d in computed_fields.values()
836 ],
837 extras_schema=extras_schema,
838 extras_keys_schema=extras_keys_schema,
839 model_name=cls.__name__,
840 )
841 inner_schema = apply_validators(fields_schema, decorators.root_validators.values(), None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
842 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
844 model_schema = core_schema.model_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
845 cls,
846 inner_schema,
847 generic_origin=generic_origin,
848 custom_init=getattr(cls, '__pydantic_custom_init__', None),
849 root_model=False,
850 post_init=getattr(cls, '__pydantic_post_init__', None),
851 config=core_config,
852 ref=model_ref,
853 )
855 schema = self._apply_model_serializers(model_schema, decorators.model_serializers.values()) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
856 schema = apply_model_validators(schema, model_validators, 'outer') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
857 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
859 def _resolve_self_type(self, obj: Any) -> Any: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
860 obj = self.model_type_stack.get() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
861 if obj is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
862 raise PydanticUserError('`typing.Self` is invalid in this context', code='invalid-self-type') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
863 return obj 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
865 def _generate_schema_from_get_schema_method(self, obj: Any, source: Any) -> core_schema.CoreSchema | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
866 BaseModel_ = import_cached_base_model() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
868 get_schema = getattr(obj, '__get_pydantic_core_schema__', None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
869 is_base_model_get_schema = ( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
870 getattr(get_schema, '__func__', None) is BaseModel_.__get_pydantic_core_schema__.__func__ # pyright: ignore[reportFunctionMemberAccess]
871 )
873 if ( 1abkeghOci
874 get_schema is not None
875 # BaseModel.__get_pydantic_core_schema__ is defined for backwards compatibility,
876 # to allow existing code to call `super().__get_pydantic_core_schema__` in Pydantic
877 # model that overrides `__get_pydantic_core_schema__`. However, it raises a deprecation
878 # warning stating that the method will be removed, and during the core schema gen we actually
879 # don't call the method:
880 and not is_base_model_get_schema
881 ):
882 # Some referenceable types might have a `__get_pydantic_core_schema__` method
883 # defined on it by users (e.g. on a dataclass). This generally doesn't play well
884 # as these types are already recognized by the `GenerateSchema` class and isn't ideal
885 # as we might end up calling `get_schema_or_ref` (expensive) on types that are actually
886 # not referenceable:
887 with self.defs.get_schema_or_ref(obj) as (_, maybe_schema): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
888 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
889 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
891 if obj is source: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
892 ref_mode = 'unpack' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
893 else:
894 ref_mode = 'to-def' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
895 schema = get_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
896 source, CallbackGetCoreSchemaHandler(self._generate_schema_inner, self, ref_mode=ref_mode)
897 )
898 if schema['type'] == 'definitions': 898 ↛ 899line 898 didn't jump to line 899 because the condition on line 898 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
899 schema = self.defs.unpack_definitions(schema)
901 ref = get_ref(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
902 if ref: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
903 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
905 # Note: if schema is of type `'definition-ref'`, we might want to copy it as a
906 # safety measure (because these are inlined in place -- i.e. mutated directly)
907 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
909 if get_schema is None and (validators := getattr(obj, '__get_validators__', None)) is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
910 from pydantic.v1 import BaseModel as BaseModelV1 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
912 if issubclass(obj, BaseModelV1): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
913 warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
914 f'Mixing V1 models and V2 models (or constructs, like `TypeAdapter`) is not supported. Please upgrade `{obj.__name__}` to V2.',
915 UserWarning,
916 )
917 else:
918 warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
919 '`__get_validators__` is deprecated and will be removed, use `__get_pydantic_core_schema__` instead.',
920 PydanticDeprecatedSince20,
921 )
922 return core_schema.chain_schema([core_schema.with_info_plain_validator_function(v) for v in validators()]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
924 def _resolve_forward_ref(self, obj: Any) -> Any: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
925 # we assume that types_namespace has the target of forward references in its scope,
926 # but this could fail, for example, if calling Validator on an imported type which contains
927 # forward references to other types only defined in the module from which it was imported
928 # `Validator(SomeImportedTypeAliasWithAForwardReference)`
929 # or the equivalent for BaseModel
930 # class Model(BaseModel):
931 # x: SomeImportedTypeAliasWithAForwardReference
932 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
933 obj = _typing_extra.eval_type_backport(obj, *self._types_namespace) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
934 except NameError as e: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
935 raise PydanticUndefinedAnnotation.from_name_error(e) from e 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
937 # if obj is still a ForwardRef, it means we can't evaluate it, raise PydanticUndefinedAnnotation
938 if isinstance(obj, ForwardRef): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
939 raise PydanticUndefinedAnnotation(obj.__forward_arg__, f'Unable to evaluate forward reference {obj}') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
941 if self._typevars_map: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
942 obj = replace_types(obj, self._typevars_map) 1abdjkeghlmcifn
944 return obj 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
946 @overload 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
947 def _get_args_resolving_forward_refs(self, obj: Any, required: Literal[True]) -> tuple[Any, ...]: ... 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
949 @overload 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
950 def _get_args_resolving_forward_refs(self, obj: Any) -> tuple[Any, ...] | None: ... 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
952 def _get_args_resolving_forward_refs(self, obj: Any, required: bool = False) -> tuple[Any, ...] | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
953 args = get_args(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
954 if args: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
955 if isinstance(obj, GenericAlias): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
956 # PEP 585 generic aliases don't convert args to ForwardRefs, unlike `typing.List/Dict` etc.
957 args = (_typing_extra._make_forward_ref(a) if isinstance(a, str) else a for a in args) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
958 args = tuple(self._resolve_forward_ref(a) if isinstance(a, ForwardRef) else a for a in args) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
959 elif required: # pragma: no cover 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
960 raise TypeError(f'Expected {obj} to have generic parameters but it had none')
961 return args 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
963 def _get_first_arg_or_any(self, obj: Any) -> Any: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
964 args = self._get_args_resolving_forward_refs(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
965 if not args: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
966 return Any 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
967 return args[0] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
969 def _get_first_two_args_or_any(self, obj: Any) -> tuple[Any, Any]: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
970 args = self._get_args_resolving_forward_refs(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
971 if not args: 971 ↛ 972line 971 didn't jump to line 972 because the condition on line 971 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
972 return (Any, Any)
973 if len(args) < 2: 973 ↛ 974line 973 didn't jump to line 974 because the condition on line 973 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
974 origin = get_origin(obj)
975 raise TypeError(f'Expected two type arguments for {origin}, got 1')
976 return args[0], args[1] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
978 def _generate_schema_inner(self, obj: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
979 if typing_objects.is_self(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
980 obj = self._resolve_self_type(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
982 if typing_objects.is_annotated(get_origin(obj)): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
983 return self._annotated_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
985 if isinstance(obj, dict): 985 ↛ 987line 985 didn't jump to line 987 because the condition on line 985 was never true1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
986 # we assume this is already a valid schema
987 return obj # type: ignore[return-value]
989 if isinstance(obj, str): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
990 obj = ForwardRef(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
992 if isinstance(obj, ForwardRef): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
993 return self.generate_schema(self._resolve_forward_ref(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
995 BaseModel = import_cached_base_model() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
997 if lenient_issubclass(obj, BaseModel): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
998 with self.model_type_stack.push(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
999 return self._model_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1001 if isinstance(obj, PydanticRecursiveRef): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1002 return core_schema.definition_reference_schema(schema_ref=obj.type_ref) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1004 return self.match_type(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1006 def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1007 """Main mapping of types to schemas.
1009 The general structure is a series of if statements starting with the simple cases
1010 (non-generic primitive types) and then handling generics and other more complex cases.
1012 Each case either generates a schema directly, calls into a public user-overridable method
1013 (like `GenerateSchema.tuple_variable_schema`) or calls into a private method that handles some
1014 boilerplate before calling into the user-facing method (e.g. `GenerateSchema._tuple_schema`).
1016 The idea is that we'll evolve this into adding more and more user facing methods over time
1017 as they get requested and we figure out what the right API for them is.
1018 """
1019 if obj is str: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1020 return core_schema.str_schema() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1021 elif obj is bytes: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1022 return core_schema.bytes_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1023 elif obj is int: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1024 return core_schema.int_schema() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1025 elif obj is float: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1026 return core_schema.float_schema() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1027 elif obj is bool: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1028 return core_schema.bool_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1029 elif obj is complex: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1030 return core_schema.complex_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1031 elif typing_objects.is_any(obj) or obj is object: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1032 return core_schema.any_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1033 elif obj is datetime.date: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1034 return core_schema.date_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1035 elif obj is datetime.datetime: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1036 return core_schema.datetime_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1037 elif obj is datetime.time: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1038 return core_schema.time_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1039 elif obj is datetime.timedelta: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1040 return core_schema.timedelta_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1041 elif obj is Decimal: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1042 return core_schema.decimal_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1043 elif obj is UUID: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1044 return core_schema.uuid_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1045 elif obj is Url: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1046 return core_schema.url_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1047 elif obj is Fraction: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1048 return self._fraction_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1049 elif obj is MultiHostUrl: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1050 return core_schema.multi_host_url_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1051 elif obj is None or obj is _typing_extra.NoneType: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1052 return core_schema.none_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1053 elif obj in IP_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1054 return self._ip_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1055 elif obj in TUPLE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1056 return self._tuple_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1057 elif obj in LIST_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1058 return self._list_schema(Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1059 elif obj in SET_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1060 return self._set_schema(Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1061 elif obj in FROZEN_SET_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1062 return self._frozenset_schema(Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1063 elif obj in SEQUENCE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1064 return self._sequence_schema(Any) 1adouxcfpvB
1065 elif obj in ITERABLE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1066 return self._iterable_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1067 elif obj in DICT_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1068 return self._dict_schema(Any, Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1069 elif obj in PATH_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1070 return self._path_schema(obj, Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1071 elif obj in DEQUE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1072 return self._deque_schema(Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1073 elif obj in MAPPING_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1074 return self._mapping_schema(obj, Any, Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1075 elif obj in COUNTER_TYPES: 1075 ↛ 1076line 1075 didn't jump to line 1076 because the condition on line 1075 was never true1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1076 return self._mapping_schema(obj, Any, int)
1077 elif typing_objects.is_typealiastype(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1078 return self._type_alias_type_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1079 elif obj is type: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1080 return self._type_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1081 elif _typing_extra.is_callable(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1082 return core_schema.callable_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1083 elif typing_objects.is_literal(get_origin(obj)): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1084 return self._literal_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1085 elif is_typeddict(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1086 return self._typed_dict_schema(obj, None) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1087 elif _typing_extra.is_namedtuple(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1088 return self._namedtuple_schema(obj, None) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1089 elif typing_objects.is_newtype(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1090 # NewType, can't use isinstance because it fails <3.10
1091 return self.generate_schema(obj.__supertype__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1092 elif obj in PATTERN_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1093 return self._pattern_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1094 elif _typing_extra.is_hashable(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1095 return self._hashable_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1096 elif isinstance(obj, typing.TypeVar): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1097 return self._unsubstituted_typevar_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1098 elif _typing_extra.is_finalvar(obj): 1098 ↛ 1099line 1098 didn't jump to line 1099 because the condition on line 1098 was never true1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1099 if obj is Final:
1100 return core_schema.any_schema()
1101 return self.generate_schema(
1102 self._get_first_arg_or_any(obj),
1103 )
1104 elif isinstance(obj, VALIDATE_CALL_SUPPORTED_TYPES): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1105 return self._call_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1106 elif inspect.isclass(obj) and issubclass(obj, Enum): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1107 return self._enum_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1108 elif obj is ZoneInfo: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1109 return self._zoneinfo_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1111 # dataclasses.is_dataclass coerces dc instances to types, but we only handle
1112 # the case of a dc type here
1113 if dataclasses.is_dataclass(obj): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1114 return self._dataclass_schema(obj, None) # pyright: ignore[reportArgumentType] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1116 origin = get_origin(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1117 if origin is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1118 return self._match_generic_type(obj, origin) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1120 if self._arbitrary_types: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1121 return self._arbitrary_type_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1122 return self._unknown_type_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1124 def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C901 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1125 # Need to handle generic dataclasses before looking for the schema properties because attribute accesses
1126 # on _GenericAlias delegate to the origin type, so lose the information about the concrete parametrization
1127 # As a result, currently, there is no way to cache the schema for generic dataclasses. This may be possible
1128 # to resolve by modifying the value returned by `Generic.__class_getitem__`, but that is a dangerous game.
1129 if dataclasses.is_dataclass(origin): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1130 return self._dataclass_schema(obj, origin) # pyright: ignore[reportArgumentType] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1131 if _typing_extra.is_namedtuple(origin): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1132 return self._namedtuple_schema(obj, origin) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1134 schema = self._generate_schema_from_get_schema_method(origin, obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1135 if schema is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1136 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1138 if typing_objects.is_typealiastype(origin): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1139 return self._type_alias_type_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1140 elif is_union_origin(origin): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1141 return self._union_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1142 elif origin in TUPLE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1143 return self._tuple_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1144 elif origin in LIST_TYPES: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1145 return self._list_schema(self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1146 elif origin in SET_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1147 return self._set_schema(self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1148 elif origin in FROZEN_SET_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1149 return self._frozenset_schema(self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1150 elif origin in DICT_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1151 return self._dict_schema(*self._get_first_two_args_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1152 elif origin in PATH_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1153 return self._path_schema(origin, self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1154 elif origin in DEQUE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1155 return self._deque_schema(self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1156 elif origin in MAPPING_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1157 return self._mapping_schema(origin, *self._get_first_two_args_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1158 elif origin in COUNTER_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1159 return self._mapping_schema(origin, self._get_first_arg_or_any(obj), int) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1160 elif is_typeddict(origin): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1161 return self._typed_dict_schema(obj, origin) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1162 elif origin in TYPE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1163 return self._subclass_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1164 elif origin in SEQUENCE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1165 return self._sequence_schema(self._get_first_arg_or_any(obj)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1166 elif origin in ITERABLE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1167 return self._iterable_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1168 elif origin in PATTERN_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1169 return self._pattern_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1171 if self._arbitrary_types: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1172 return self._arbitrary_type_schema(origin) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1173 return self._unknown_type_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1175 def _generate_td_field_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1176 self,
1177 name: str,
1178 field_info: FieldInfo,
1179 decorators: DecoratorInfos,
1180 *,
1181 required: bool = True,
1182 ) -> core_schema.TypedDictField:
1183 """Prepare a TypedDictField to represent a model or typeddict field."""
1184 common_field = self._common_field_schema(name, field_info, decorators) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1185 return core_schema.typed_dict_field( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1186 common_field['schema'],
1187 required=False if not field_info.is_required() else required,
1188 serialization_exclude=common_field['serialization_exclude'],
1189 validation_alias=common_field['validation_alias'],
1190 serialization_alias=common_field['serialization_alias'],
1191 metadata=common_field['metadata'],
1192 )
1194 def _generate_md_field_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1195 self,
1196 name: str,
1197 field_info: FieldInfo,
1198 decorators: DecoratorInfos,
1199 ) -> core_schema.ModelField:
1200 """Prepare a ModelField to represent a model field."""
1201 common_field = self._common_field_schema(name, field_info, decorators) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1202 return core_schema.model_field( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1203 common_field['schema'],
1204 serialization_exclude=common_field['serialization_exclude'],
1205 validation_alias=common_field['validation_alias'],
1206 serialization_alias=common_field['serialization_alias'],
1207 frozen=common_field['frozen'],
1208 metadata=common_field['metadata'],
1209 )
1211 def _generate_dc_field_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1212 self,
1213 name: str,
1214 field_info: FieldInfo,
1215 decorators: DecoratorInfos,
1216 ) -> core_schema.DataclassField:
1217 """Prepare a DataclassField to represent the parameter/field, of a dataclass."""
1218 common_field = self._common_field_schema(name, field_info, decorators) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1219 return core_schema.dataclass_field( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1220 name,
1221 common_field['schema'],
1222 init=field_info.init,
1223 init_only=field_info.init_var or None,
1224 kw_only=None if field_info.kw_only else False,
1225 serialization_exclude=common_field['serialization_exclude'],
1226 validation_alias=common_field['validation_alias'],
1227 serialization_alias=common_field['serialization_alias'],
1228 frozen=common_field['frozen'],
1229 metadata=common_field['metadata'],
1230 )
1232 @staticmethod 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1233 def _apply_alias_generator_to_field_info( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1234 alias_generator: Callable[[str], str] | AliasGenerator, field_info: FieldInfo, field_name: str
1235 ) -> None:
1236 """Apply an alias_generator to aliases on a FieldInfo instance if appropriate.
1238 Args:
1239 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
1240 field_info: The FieldInfo instance to which the alias_generator is (maybe) applied.
1241 field_name: The name of the field from which to generate the alias.
1242 """
1243 # Apply an alias_generator if
1244 # 1. An alias is not specified
1245 # 2. An alias is specified, but the priority is <= 1
1246 if ( 1abkeghci
1247 field_info.alias_priority is None
1248 or field_info.alias_priority <= 1
1249 or field_info.alias is None
1250 or field_info.validation_alias is None
1251 or field_info.serialization_alias is None
1252 ):
1253 alias, validation_alias, serialization_alias = None, None, None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1255 if isinstance(alias_generator, AliasGenerator): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1256 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1257 elif isinstance(alias_generator, Callable): 1257 ↛ 1265line 1257 didn't jump to line 1265 because the condition on line 1257 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1258 alias = alias_generator(field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1259 if not isinstance(alias, str): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1260 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1262 # if priority is not set, we set to 1
1263 # which supports the case where the alias_generator from a child class is used
1264 # to generate an alias for a field in a parent class
1265 if field_info.alias_priority is None or field_info.alias_priority <= 1: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1266 field_info.alias_priority = 1 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1268 # if the priority is 1, then we set the aliases to the generated alias
1269 if field_info.alias_priority == 1: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1270 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1271 field_info.validation_alias = _get_first_non_null(validation_alias, alias) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1272 field_info.alias = alias 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1274 # if any of the aliases are not set, then we set them to the corresponding generated alias
1275 if field_info.alias is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1276 field_info.alias = alias 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1277 if field_info.serialization_alias is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1278 field_info.serialization_alias = _get_first_non_null(serialization_alias, alias) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1279 if field_info.validation_alias is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1280 field_info.validation_alias = _get_first_non_null(validation_alias, alias) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1282 @staticmethod 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1283 def _apply_alias_generator_to_computed_field_info( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1284 alias_generator: Callable[[str], str] | AliasGenerator,
1285 computed_field_info: ComputedFieldInfo,
1286 computed_field_name: str,
1287 ):
1288 """Apply an alias_generator to alias on a ComputedFieldInfo instance if appropriate.
1290 Args:
1291 alias_generator: A callable that takes a string and returns a string, or an AliasGenerator instance.
1292 computed_field_info: The ComputedFieldInfo instance to which the alias_generator is (maybe) applied.
1293 computed_field_name: The name of the computed field from which to generate the alias.
1294 """
1295 # Apply an alias_generator if
1296 # 1. An alias is not specified
1297 # 2. An alias is specified, but the priority is <= 1
1299 if ( 1abke
1300 computed_field_info.alias_priority is None
1301 or computed_field_info.alias_priority <= 1
1302 or computed_field_info.alias is None
1303 ):
1304 alias, validation_alias, serialization_alias = None, None, None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1306 if isinstance(alias_generator, AliasGenerator): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1307 alias, validation_alias, serialization_alias = alias_generator.generate_aliases(computed_field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1308 elif isinstance(alias_generator, Callable): 1308 ↛ 1316line 1308 didn't jump to line 1316 because the condition on line 1308 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1309 alias = alias_generator(computed_field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1310 if not isinstance(alias, str): 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1311 raise TypeError(f'alias_generator {alias_generator} must return str, not {alias.__class__}')
1313 # if priority is not set, we set to 1
1314 # which supports the case where the alias_generator from a child class is used
1315 # to generate an alias for a field in a parent class
1316 if computed_field_info.alias_priority is None or computed_field_info.alias_priority <= 1: 1316 ↛ 1322line 1316 didn't jump to line 1322 because the condition on line 1316 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1317 computed_field_info.alias_priority = 1 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1319 # if the priority is 1, then we set the aliases to the generated alias
1320 # note that we use the serialization_alias with priority over alias, as computed_field
1321 # aliases are used for serialization only (not validation)
1322 if computed_field_info.alias_priority == 1: 1322 ↛ exitline 1322 didn't return from function '_apply_alias_generator_to_computed_field_info' because the condition on line 1322 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1323 computed_field_info.alias = _get_first_non_null(serialization_alias, alias) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1325 @staticmethod 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1326 def _apply_field_title_generator_to_field_info( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1327 config_wrapper: ConfigWrapper, field_info: FieldInfo | ComputedFieldInfo, field_name: str
1328 ) -> None:
1329 """Apply a field_title_generator on a FieldInfo or ComputedFieldInfo instance if appropriate
1330 Args:
1331 config_wrapper: The config of the model
1332 field_info: The FieldInfo or ComputedField instance to which the title_generator is (maybe) applied.
1333 field_name: The name of the field from which to generate the title.
1334 """
1335 field_title_generator = field_info.field_title_generator or config_wrapper.field_title_generator 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1337 if field_title_generator is None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1338 return 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1340 if field_info.title is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1341 title = field_title_generator(field_name, field_info) # type: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1342 if not isinstance(title, str): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1343 raise TypeError(f'field_title_generator {field_title_generator} must return str, not {title.__class__}') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1345 field_info.title = title 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1347 def _common_field_schema( # C901 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1348 self, name: str, field_info: FieldInfo, decorators: DecoratorInfos
1349 ) -> _CommonField:
1350 source_type, annotations = field_info.annotation, field_info.metadata 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1352 def set_discriminator(schema: CoreSchema) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1353 schema = self._apply_discriminator_to_union(schema, field_info.discriminator) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1354 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1356 # Convert `@field_validator` decorators to `Before/After/Plain/WrapValidator` instances:
1357 validators_from_decorators = [] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1358 for decorator in filter_field_decorator_info_by_field(decorators.field_validators.values(), name): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1359 validators_from_decorators.append(_mode_to_validator[decorator.info.mode]._from_decorator(decorator)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1361 with self.field_name_stack.push(name): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1362 if field_info.discriminator is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1363 schema = self._apply_annotations( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1364 source_type, annotations + validators_from_decorators, transform_inner_schema=set_discriminator
1365 )
1366 else:
1367 schema = self._apply_annotations( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1368 source_type,
1369 annotations + validators_from_decorators,
1370 )
1372 # This V1 compatibility shim should eventually be removed
1373 # push down any `each_item=True` validators
1374 # note that this won't work for any Annotated types that get wrapped by a function validator
1375 # but that's okay because that didn't exist in V1
1376 this_field_validators = filter_field_decorator_info_by_field(decorators.validators.values(), name) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1377 if _validators_require_validate_default(this_field_validators): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1378 field_info.validate_default = True 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1379 each_item_validators = [v for v in this_field_validators if v.info.each_item is True] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1380 this_field_validators = [v for v in this_field_validators if v not in each_item_validators] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1381 schema = apply_each_item_validators(schema, each_item_validators, name) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1383 schema = apply_validators(schema, this_field_validators, name) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1385 # the default validator needs to go outside of any other validators
1386 # so that it is the topmost validator for the field validator
1387 # which uses it to check if the field has a default value or not
1388 if not field_info.is_required(): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1389 schema = wrap_default(field_info, schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1391 schema = self._apply_field_serializers( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1392 schema, filter_field_decorator_info_by_field(decorators.field_serializers.values(), name)
1393 )
1394 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, name) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1396 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(field_info) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1397 core_metadata: dict[str, Any] = {} 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1398 update_core_metadata( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1399 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
1400 )
1402 alias_generator = self._config_wrapper.alias_generator 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1403 if alias_generator is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1404 self._apply_alias_generator_to_field_info(alias_generator, field_info, name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1406 if isinstance(field_info.validation_alias, (AliasChoices, AliasPath)): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1407 validation_alias = field_info.validation_alias.convert_to_aliases() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1408 else:
1409 validation_alias = field_info.validation_alias 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1411 return _common_field( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1412 schema,
1413 serialization_exclude=True if field_info.exclude else None,
1414 validation_alias=validation_alias,
1415 serialization_alias=field_info.serialization_alias,
1416 frozen=field_info.frozen,
1417 metadata=core_metadata,
1418 )
1420 def _union_schema(self, union_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1421 """Generate schema for a Union."""
1422 args = self._get_args_resolving_forward_refs(union_type, required=True) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1423 choices: list[CoreSchema] = [] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1424 nullable = False 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1425 for arg in args: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1426 if arg is None or arg is _typing_extra.NoneType: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1427 nullable = True 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1428 else:
1429 choices.append(self.generate_schema(arg)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1431 if len(choices) == 1: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1432 s = choices[0] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1433 else:
1434 choices_with_tags: list[CoreSchema | tuple[CoreSchema, str]] = [] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1435 for choice in choices: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1436 tag = cast(CoreMetadata, choice.get('metadata', {})).get('pydantic_internal_union_tag_key') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1437 if tag is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1438 choices_with_tags.append((choice, tag)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1439 else:
1440 choices_with_tags.append(choice) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1441 s = core_schema.union_schema(choices_with_tags) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1443 if nullable: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1444 s = core_schema.nullable_schema(s) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1445 return s 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1447 def _type_alias_type_schema(self, obj: TypeAliasType) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1448 with self.defs.get_schema_or_ref(obj) as (ref, maybe_schema): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1449 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1450 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1452 origin: TypeAliasType = get_origin(obj) or obj 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1453 typevars_map = get_standard_typevars_map(obj) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1455 with self._ns_resolver.push(origin): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1456 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1457 annotation = _typing_extra.eval_type(origin.__value__, *self._types_namespace) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1458 except NameError as e:
1459 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1460 annotation = replace_types(annotation, typevars_map) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1461 schema = self.generate_schema(annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1462 assert schema['type'] != 'definitions' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1463 schema['ref'] = ref # type: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1464 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1466 def _literal_schema(self, literal_type: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1467 """Generate schema for a Literal."""
1468 expected = list(get_literal_values(literal_type, type_check=False, unpack_type_aliases='eager')) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1469 assert expected, f'literal "expected" cannot be empty, obj={literal_type}' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1470 schema = core_schema.literal_schema(expected) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1472 if self._config_wrapper.use_enum_values and any(isinstance(v, Enum) for v in expected): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1473 schema = core_schema.no_info_after_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1474 lambda v: v.value if isinstance(v, Enum) else v, schema
1475 )
1477 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1479 def _typed_dict_schema(self, typed_dict_cls: Any, origin: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1480 """Generate a core schema for a `TypedDict` class.
1482 To be able to build a `DecoratorInfos` instance for the `TypedDict` class (which will include
1483 validators, serializers, etc.), we need to have access to the original bases of the class
1484 (see https://docs.python.org/3/library/types.html#types.get_original_bases).
1485 However, the `__orig_bases__` attribute was only added in 3.12 (https://github.com/python/cpython/pull/103698).
1487 For this reason, we require Python 3.12 (or using the `typing_extensions` backport).
1488 """
1489 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1491 with ( 1abdjoqkeghlmrscifnpt
1492 self.model_type_stack.push(typed_dict_cls),
1493 self.defs.get_schema_or_ref(typed_dict_cls) as (
1494 typed_dict_ref,
1495 maybe_schema,
1496 ),
1497 ):
1498 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1499 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1501 typevars_map = get_standard_typevars_map(typed_dict_cls) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1502 if origin is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1503 typed_dict_cls = origin 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1505 if not _SUPPORTS_TYPEDDICT and type(typed_dict_cls).__module__ == 'typing': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1506 raise PydanticUserError( 1abdjoqkeghlmrscifnpt
1507 'Please use `typing_extensions.TypedDict` instead of `typing.TypedDict` on Python < 3.12.',
1508 code='typed-dict-version',
1509 )
1511 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1512 # if a typed dictionary class doesn't have config, we use the parent's config, hence a default of `None`
1513 # see https://github.com/pydantic/pydantic/issues/10917
1514 config: ConfigDict | None = get_attribute_from_bases(typed_dict_cls, '__pydantic_config__') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1515 except AttributeError: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1516 config = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1518 with self._config_wrapper_stack.push(config): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1519 core_config = self._config_wrapper.core_config(title=typed_dict_cls.__name__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1521 required_keys: frozenset[str] = typed_dict_cls.__required_keys__ 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1523 fields: dict[str, core_schema.TypedDictField] = {} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1525 decorators = DecoratorInfos.build(typed_dict_cls) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1527 if self._config_wrapper.use_attribute_docstrings: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1528 field_docstrings = extract_docstrings_from_cls(typed_dict_cls, use_inspect=True) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1529 else:
1530 field_docstrings = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1532 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1533 annotations = _typing_extra.get_cls_type_hints(typed_dict_cls, ns_resolver=self._ns_resolver) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1534 except NameError as e:
1535 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1537 readonly_fields: list[str] = [] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1539 for field_name, annotation in annotations.items(): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1540 field_info = FieldInfo.from_annotation(annotation, _source=AnnotationSource.TYPED_DICT) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1541 field_info.annotation = replace_types(field_info.annotation, typevars_map) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1543 required = ( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1544 field_name in required_keys or 'required' in field_info._qualifiers
1545 ) and 'not_required' not in field_info._qualifiers
1546 if 'read_only' in field_info._qualifiers: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1547 readonly_fields.append(field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1549 if ( 1abkeghci
1550 field_docstrings is not None
1551 and field_info.description is None
1552 and field_name in field_docstrings
1553 ):
1554 field_info.description = field_docstrings[field_name] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1555 self._apply_field_title_generator_to_field_info(self._config_wrapper, field_info, field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1556 fields[field_name] = self._generate_td_field_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1557 field_name, field_info, decorators, required=required
1558 )
1560 if readonly_fields: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1561 fields_repr = ', '.join(repr(f) for f in readonly_fields) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1562 plural = len(readonly_fields) >= 2 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1563 warnings.warn( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1564 f'Item{"s" if plural else ""} {fields_repr} on TypedDict class {typed_dict_cls.__name__!r} '
1565 f'{"are" if plural else "is"} using the `ReadOnly` qualifier. Pydantic will not protect items '
1566 'from any mutation on dictionary instances.',
1567 UserWarning,
1568 )
1570 td_schema = core_schema.typed_dict_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1571 fields,
1572 cls=typed_dict_cls,
1573 computed_fields=[
1574 self._computed_field_schema(d, decorators.field_serializers)
1575 for d in decorators.computed_fields.values()
1576 ],
1577 ref=typed_dict_ref,
1578 config=core_config,
1579 )
1581 schema = self._apply_model_serializers(td_schema, decorators.model_serializers.values()) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1582 schema = apply_model_validators(schema, decorators.model_validators.values(), 'all') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1583 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1585 def _namedtuple_schema(self, namedtuple_cls: Any, origin: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1586 """Generate schema for a NamedTuple."""
1587 with ( 1abdjoqkeghlmrscifnpt
1588 self.model_type_stack.push(namedtuple_cls),
1589 self.defs.get_schema_or_ref(namedtuple_cls) as (
1590 namedtuple_ref,
1591 maybe_schema,
1592 ),
1593 ):
1594 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1595 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1596 typevars_map = get_standard_typevars_map(namedtuple_cls) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1597 if origin is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1598 namedtuple_cls = origin 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1600 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1601 annotations = _typing_extra.get_cls_type_hints(namedtuple_cls, ns_resolver=self._ns_resolver) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1602 except NameError as e:
1603 raise PydanticUndefinedAnnotation.from_name_error(e) from e
1604 if not annotations: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1605 # annotations is empty, happens if namedtuple_cls defined via collections.namedtuple(...)
1606 annotations: dict[str, Any] = {k: Any for k in namedtuple_cls._fields} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1608 if typevars_map: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1609 annotations = { 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1610 field_name: replace_types(annotation, typevars_map)
1611 for field_name, annotation in annotations.items()
1612 }
1614 arguments_schema = core_schema.arguments_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1615 [
1616 self._generate_parameter_schema(
1617 field_name,
1618 annotation,
1619 source=AnnotationSource.NAMED_TUPLE,
1620 default=namedtuple_cls._field_defaults.get(field_name, Parameter.empty),
1621 )
1622 for field_name, annotation in annotations.items()
1623 ],
1624 metadata={'pydantic_js_prefer_positional_arguments': True},
1625 )
1626 schema = core_schema.call_schema(arguments_schema, namedtuple_cls, ref=namedtuple_ref) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1627 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1629 def _generate_parameter_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1630 self,
1631 name: str,
1632 annotation: type[Any],
1633 source: AnnotationSource,
1634 default: Any = Parameter.empty,
1635 mode: Literal['positional_only', 'positional_or_keyword', 'keyword_only'] | None = None,
1636 ) -> core_schema.ArgumentsParameter:
1637 """Generate the definition of a field in a namedtuple or a parameter in a function signature.
1639 This definition is meant to be used for the `'arguments'` core schema, which will be replaced
1640 in V3 by the `'arguments-v3`'.
1641 """
1642 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1644 if default is Parameter.empty: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1645 field = FieldInfo.from_annotation(annotation, _source=source) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1646 else:
1647 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1648 assert field.annotation is not None, 'field.annotation should not be None when generating a schema' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1649 with self.field_name_stack.push(name): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1650 schema = self._apply_annotations(field.annotation, [field]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1652 if not field.is_required(): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1653 schema = wrap_default(field, schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1655 parameter_schema = core_schema.arguments_parameter(name, schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1656 if mode is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1657 parameter_schema['mode'] = mode 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1658 if field.alias is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1659 parameter_schema['alias'] = field.alias 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1660 else:
1661 alias_generator = self._config_wrapper.alias_generator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1662 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None: 1662 ↛ 1663line 1662 didn't jump to line 1663 because the condition on line 1662 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1663 parameter_schema['alias'] = alias_generator.alias(name)
1664 elif callable(alias_generator): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1665 parameter_schema['alias'] = alias_generator(name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1666 return parameter_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1668 def _generate_parameter_v3_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1669 self,
1670 name: str,
1671 annotation: Any,
1672 source: AnnotationSource,
1673 mode: Literal[
1674 'positional_only',
1675 'positional_or_keyword',
1676 'keyword_only',
1677 'var_args',
1678 'var_kwargs_uniform',
1679 'var_kwargs_unpacked_typed_dict',
1680 ],
1681 default: Any = Parameter.empty,
1682 ) -> core_schema.ArgumentsV3Parameter:
1683 """Generate the definition of a parameter in a function signature.
1685 This definition is meant to be used for the `'arguments-v3'` core schema, which will replace
1686 the `'arguments`' schema in V3.
1687 """
1688 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1690 if default is Parameter.empty: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1691 field = FieldInfo.from_annotation(annotation, _source=source) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1692 else:
1693 field = FieldInfo.from_annotated_attribute(annotation, default, _source=source) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1695 with self.field_name_stack.push(name): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1696 schema = self._apply_annotations(field.annotation, [field]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1698 if not field.is_required(): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1699 schema = wrap_default(field, schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1701 parameter_schema = core_schema.arguments_v3_parameter( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1702 name=name,
1703 schema=schema,
1704 mode=mode,
1705 )
1706 if field.alias is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1707 parameter_schema['alias'] = field.alias 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1708 else:
1709 alias_generator = self._config_wrapper.alias_generator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1710 if isinstance(alias_generator, AliasGenerator) and alias_generator.alias is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1711 parameter_schema['alias'] = alias_generator.alias(name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1712 elif callable(alias_generator): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1713 parameter_schema['alias'] = alias_generator(name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1715 return parameter_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1717 def _tuple_schema(self, tuple_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1718 """Generate schema for a Tuple, e.g. `tuple[int, str]` or `tuple[int, ...]`."""
1719 # TODO: do we really need to resolve type vars here?
1720 typevars_map = get_standard_typevars_map(tuple_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1721 params = self._get_args_resolving_forward_refs(tuple_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1723 if typevars_map and params: 1723 ↛ 1724line 1723 didn't jump to line 1724 because the condition on line 1723 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1724 params = tuple(replace_types(param, typevars_map) for param in params)
1726 # NOTE: subtle difference: `tuple[()]` gives `params=()`, whereas `typing.Tuple[()]` gives `params=((),)`
1727 # This is only true for <3.11, on Python 3.11+ `typing.Tuple[()]` gives `params=()`
1728 if not params: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1729 if tuple_type in TUPLE_TYPES: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1730 return core_schema.tuple_schema([core_schema.any_schema()], variadic_item_index=0) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1731 else:
1732 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1733 return core_schema.tuple_schema([]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1734 elif params[-1] is Ellipsis: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1735 if len(params) == 2: 1735 ↛ 1739line 1735 didn't jump to line 1739 because the condition on line 1735 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1736 return core_schema.tuple_schema([self.generate_schema(params[0])], variadic_item_index=0) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1737 else:
1738 # TODO: something like https://github.com/pydantic/pydantic/issues/5952
1739 raise ValueError('Variable tuples can only have one type')
1740 elif len(params) == 1 and params[0] == (): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1741 # special case for `tuple[()]` which means `tuple[]` - an empty tuple
1742 # NOTE: This conditional can be removed when we drop support for Python 3.10.
1743 return core_schema.tuple_schema([]) 1abdjkeghlmcifn
1744 else:
1745 return core_schema.tuple_schema([self.generate_schema(param) for param in params]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1747 def _type_schema(self) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1748 return core_schema.custom_error_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1749 core_schema.is_instance_schema(type),
1750 custom_error_type='is_type',
1751 custom_error_message='Input should be a type',
1752 )
1754 def _zoneinfo_schema(self) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1755 """Generate schema for a zone_info.ZoneInfo object"""
1756 from ._validators import validate_str_is_valid_iana_tz 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1758 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'zoneinfo'}]} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1759 return core_schema.no_info_plain_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1760 validate_str_is_valid_iana_tz,
1761 serialization=core_schema.to_string_ser_schema(),
1762 metadata=metadata,
1763 )
1765 def _union_is_subclass_schema(self, union_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1766 """Generate schema for `type[Union[X, ...]]`."""
1767 args = self._get_args_resolving_forward_refs(union_type, required=True) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1768 return core_schema.union_schema([self.generate_schema(type[args]) for args in args]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1770 def _subclass_schema(self, type_: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1771 """Generate schema for a type, e.g. `type[int]`."""
1772 type_param = self._get_first_arg_or_any(type_) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1774 # Assume `type[Annotated[<typ>, ...]]` is equivalent to `type[<typ>]`:
1775 type_param = _typing_extra.annotated_type(type_param) or type_param 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1777 if typing_objects.is_any(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1778 return self._type_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1779 elif typing_objects.is_typealiastype(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1780 return self.generate_schema(type[type_param.__value__]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1781 elif typing_objects.is_typevar(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1782 if type_param.__bound__: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1783 if is_union_origin(get_origin(type_param.__bound__)): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1784 return self._union_is_subclass_schema(type_param.__bound__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1785 return core_schema.is_subclass_schema(type_param.__bound__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1786 elif type_param.__constraints__: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1787 return core_schema.union_schema([self.generate_schema(type[c]) for c in type_param.__constraints__]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1788 else:
1789 return self._type_schema() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1790 elif is_union_origin(get_origin(type_param)): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1791 return self._union_is_subclass_schema(type_param) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1792 else:
1793 if typing_objects.is_self(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1794 type_param = self._resolve_self_type(type_param) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1795 if _typing_extra.is_generic_alias(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1796 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1797 'Subscripting `type[]` with an already parametrized type is not supported. '
1798 f'Instead of using type[{type_param!r}], use type[{_repr.display_as_type(get_origin(type_param))}].',
1799 code=None,
1800 )
1801 if not inspect.isclass(type_param): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1802 # when using type[None], this doesn't type convert to type[NoneType], and None isn't a class
1803 # so we handle it manually here
1804 if type_param is None: 1804 ↛ 1806line 1804 didn't jump to line 1806 because the condition on line 1804 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1805 return core_schema.is_subclass_schema(_typing_extra.NoneType) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1806 raise TypeError(f'Expected a class, got {type_param!r}')
1807 return core_schema.is_subclass_schema(type_param) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1809 def _sequence_schema(self, items_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1810 """Generate schema for a Sequence, e.g. `Sequence[int]`."""
1811 from ._serializers import serialize_sequence_via_list 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1813 item_type_schema = self.generate_schema(items_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1814 list_schema = core_schema.list_schema(item_type_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1816 json_schema = smart_deepcopy(list_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1817 python_schema = core_schema.is_instance_schema(typing.Sequence, cls_repr='Sequence') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1818 if not typing_objects.is_any(items_type): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1819 from ._validators import sequence_validator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1821 python_schema = core_schema.chain_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1822 [python_schema, core_schema.no_info_wrap_validator_function(sequence_validator, list_schema)],
1823 )
1825 serialization = core_schema.wrap_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1826 serialize_sequence_via_list, schema=item_type_schema, info_arg=True
1827 )
1828 return core_schema.json_or_python_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1829 json_schema=json_schema, python_schema=python_schema, serialization=serialization
1830 )
1832 def _iterable_schema(self, type_: Any) -> core_schema.GeneratorSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1833 """Generate a schema for an `Iterable`."""
1834 item_type = self._get_first_arg_or_any(type_) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1836 return core_schema.generator_schema(self.generate_schema(item_type)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1838 def _pattern_schema(self, pattern_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1839 from . import _validators 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1841 metadata = {'pydantic_js_functions': [lambda _1, _2: {'type': 'string', 'format': 'regex'}]} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1842 ser = core_schema.plain_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1843 attrgetter('pattern'), when_used='json', return_schema=core_schema.str_schema()
1844 )
1845 if pattern_type is typing.Pattern or pattern_type is re.Pattern: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1846 # bare type
1847 return core_schema.no_info_plain_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1848 _validators.pattern_either_validator, serialization=ser, metadata=metadata
1849 )
1851 param = self._get_args_resolving_forward_refs( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1852 pattern_type,
1853 required=True,
1854 )[0]
1855 if param is str: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1856 return core_schema.no_info_plain_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1857 _validators.pattern_str_validator, serialization=ser, metadata=metadata
1858 )
1859 elif param is bytes: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1860 return core_schema.no_info_plain_validator_function( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1861 _validators.pattern_bytes_validator, serialization=ser, metadata=metadata
1862 )
1863 else:
1864 raise PydanticSchemaGenerationError(f'Unable to generate pydantic-core schema for {pattern_type!r}.') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1866 def _hashable_schema(self) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1867 return core_schema.custom_error_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1868 schema=core_schema.json_or_python_schema(
1869 json_schema=core_schema.chain_schema(
1870 [core_schema.any_schema(), core_schema.is_instance_schema(collections.abc.Hashable)]
1871 ),
1872 python_schema=core_schema.is_instance_schema(collections.abc.Hashable),
1873 ),
1874 custom_error_type='is_hashable',
1875 custom_error_message='Input should be hashable',
1876 )
1878 def _dataclass_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1879 self, dataclass: type[StandardDataclass], origin: type[StandardDataclass] | None
1880 ) -> core_schema.CoreSchema:
1881 """Generate schema for a dataclass."""
1882 with ( 1abdjoqkeghlmrsOJKLMNcifnpt
1883 self.model_type_stack.push(dataclass),
1884 self.defs.get_schema_or_ref(dataclass) as (
1885 dataclass_ref,
1886 maybe_schema,
1887 ),
1888 ):
1889 if maybe_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1890 return maybe_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1892 schema = dataclass.__dict__.get('__pydantic_core_schema__') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1893 if schema is not None and not isinstance(schema, MockCoreSchema): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1894 if schema['type'] == 'definitions': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1895 schema = self.defs.unpack_definitions(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1896 ref = get_ref(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1897 if ref: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1898 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1899 else:
1900 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1902 typevars_map = get_standard_typevars_map(dataclass) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1903 if origin is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1904 dataclass = origin 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1906 # if (plain) dataclass doesn't have config, we use the parent's config, hence a default of `None`
1907 # (Pydantic dataclasses have an empty dict config by default).
1908 # see https://github.com/pydantic/pydantic/issues/10917
1909 config = getattr(dataclass, '__pydantic_config__', None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1911 from ..dataclasses import is_pydantic_dataclass 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1913 with self._ns_resolver.push(dataclass), self._config_wrapper_stack.push(config): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1914 if is_pydantic_dataclass(dataclass): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1915 # Copy the field info instances to avoid mutating the `FieldInfo` instances
1916 # of the generic dataclass generic origin (e.g. `apply_typevars_map` below).
1917 # Note that we don't apply `deepcopy` on `__pydantic_fields__` because we
1918 # don't want to copy the `FieldInfo` attributes:
1919 fields = {f_name: copy(field_info) for f_name, field_info in dataclass.__pydantic_fields__.items()} 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1920 if typevars_map: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1921 for field in fields.values(): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1922 field.apply_typevars_map(typevars_map, *self._types_namespace) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1923 else:
1924 fields = collect_dataclass_fields( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1925 dataclass,
1926 typevars_map=typevars_map,
1927 config_wrapper=self._config_wrapper,
1928 )
1930 if self._config_wrapper.extra == 'allow': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1931 # disallow combination of init=False on a dataclass field and extra='allow' on a dataclass
1932 for field_name, field in fields.items(): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1933 if field.init is False: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1934 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1935 f'Field {field_name} has `init=False` and dataclass has config setting `extra="allow"`. '
1936 f'This combination is not allowed.',
1937 code='dataclass-init-false-extra-allow',
1938 )
1940 decorators = dataclass.__dict__.get('__pydantic_decorators__') or DecoratorInfos.build(dataclass) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1941 # Move kw_only=False args to the start of the list, as this is how vanilla dataclasses work.
1942 # Note that when kw_only is missing or None, it is treated as equivalent to kw_only=True
1943 args = sorted( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1944 (self._generate_dc_field_schema(k, v, decorators) for k, v in fields.items()),
1945 key=lambda a: a.get('kw_only') is not False,
1946 )
1947 has_post_init = hasattr(dataclass, '__post_init__') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1948 has_slots = hasattr(dataclass, '__slots__') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1950 args_schema = core_schema.dataclass_args_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1951 dataclass.__name__,
1952 args,
1953 computed_fields=[
1954 self._computed_field_schema(d, decorators.field_serializers)
1955 for d in decorators.computed_fields.values()
1956 ],
1957 collect_init_only=has_post_init,
1958 )
1960 inner_schema = apply_validators(args_schema, decorators.root_validators.values(), None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1962 model_validators = decorators.model_validators.values() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1963 inner_schema = apply_model_validators(inner_schema, model_validators, 'inner') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1965 core_config = self._config_wrapper.core_config(title=dataclass.__name__) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1967 dc_schema = core_schema.dataclass_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1968 dataclass,
1969 inner_schema,
1970 generic_origin=origin,
1971 post_init=has_post_init,
1972 ref=dataclass_ref,
1973 fields=[field.name for field in dataclasses.fields(dataclass)],
1974 slots=has_slots,
1975 config=core_config,
1976 # we don't use a custom __setattr__ for dataclasses, so we must
1977 # pass along the frozen config setting to the pydantic-core schema
1978 frozen=self._config_wrapper_stack.tail.frozen,
1979 )
1980 schema = self._apply_model_serializers(dc_schema, decorators.model_serializers.values()) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1981 schema = apply_model_validators(schema, model_validators, 'outer') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1982 return self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1984 def _call_schema(self, function: ValidateCallSupportedTypes) -> core_schema.CallSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
1985 """Generate schema for a Callable.
1987 TODO support functional validators once we support them in Config
1988 """
1989 arguments_schema = self._arguments_schema(function) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1991 return_schema: core_schema.CoreSchema | None = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1992 config_wrapper = self._config_wrapper 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1993 if config_wrapper.validate_return: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1994 sig = signature(function) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1995 return_hint = sig.return_annotation 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1996 if return_hint is not sig.empty: 1996 ↛ 2003line 1996 didn't jump to line 2003 because the condition on line 1996 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1997 globalns, localns = self._types_namespace 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1998 type_hints = _typing_extra.get_function_type_hints( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
1999 function, globalns=globalns, localns=localns, include_keys={'return'}
2000 )
2001 return_schema = self.generate_schema(type_hints['return']) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2003 return core_schema.call_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2004 arguments_schema,
2005 function,
2006 return_schema=return_schema,
2007 )
2009 def _arguments_schema( 1abdjoquwxCDghlmrsyzEFGOJKLMNPcifnptvABHI
2010 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2011 ) -> core_schema.ArgumentsSchema:
2012 """Generate schema for a Signature."""
2013 mode_lookup: dict[_ParameterKind, Literal['positional_only', 'positional_or_keyword', 'keyword_only']] = { 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2014 Parameter.POSITIONAL_ONLY: 'positional_only',
2015 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2016 Parameter.KEYWORD_ONLY: 'keyword_only',
2017 }
2019 sig = signature(function) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2020 globalns, localns = self._types_namespace 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2021 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2023 arguments_list: list[core_schema.ArgumentsParameter] = [] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2024 var_args_schema: core_schema.CoreSchema | None = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2025 var_kwargs_schema: core_schema.CoreSchema | None = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2026 var_kwargs_mode: core_schema.VarKwargsMode | None = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2028 for i, (name, p) in enumerate(sig.parameters.items()): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2029 if p.annotation is sig.empty: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2030 annotation = typing.cast(Any, Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2031 else:
2032 annotation = type_hints[name] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2034 if parameters_callback is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2035 result = parameters_callback(i, name, annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2036 if result == 'skip': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2037 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2039 parameter_mode = mode_lookup.get(p.kind) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2040 if parameter_mode is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2041 arg_schema = self._generate_parameter_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2042 name, annotation, AnnotationSource.FUNCTION, p.default, parameter_mode
2043 )
2044 arguments_list.append(arg_schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2045 elif p.kind == Parameter.VAR_POSITIONAL: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2046 var_args_schema = self.generate_schema(annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2047 else:
2048 assert p.kind == Parameter.VAR_KEYWORD, p.kind 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2050 unpack_type = _typing_extra.unpack_type(annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2051 if unpack_type is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2052 origin = get_origin(unpack_type) or unpack_type 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2053 if not is_typeddict(origin): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2054 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2055 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2056 code='unpack-typed-dict',
2057 )
2058 non_pos_only_param_names = { 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2059 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2060 }
2061 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2062 if overlapping_params: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2063 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2064 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2065 f'{"s" if len(overlapping_params) >= 2 else ""} '
2066 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2067 code='overlapping-unpack-typed-dict',
2068 )
2070 var_kwargs_mode = 'unpacked-typed-dict' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2071 var_kwargs_schema = self._typed_dict_schema(unpack_type, get_origin(unpack_type)) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2072 else:
2073 var_kwargs_mode = 'uniform' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2074 var_kwargs_schema = self.generate_schema(annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2076 return core_schema.arguments_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2077 arguments_list,
2078 var_args_schema=var_args_schema,
2079 var_kwargs_mode=var_kwargs_mode,
2080 var_kwargs_schema=var_kwargs_schema,
2081 validate_by_name=self._config_wrapper.validate_by_name,
2082 )
2084 def _arguments_v3_schema( 1abdjoquwxCDghlmrsyzEFGOJKLMNPcifnptvABHI
2085 self, function: ValidateCallSupportedTypes, parameters_callback: ParametersCallback | None = None
2086 ) -> core_schema.ArgumentsV3Schema:
2087 mode_lookup: dict[ 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2088 _ParameterKind, Literal['positional_only', 'positional_or_keyword', 'var_args', 'keyword_only']
2089 ] = {
2090 Parameter.POSITIONAL_ONLY: 'positional_only',
2091 Parameter.POSITIONAL_OR_KEYWORD: 'positional_or_keyword',
2092 Parameter.VAR_POSITIONAL: 'var_args',
2093 Parameter.KEYWORD_ONLY: 'keyword_only',
2094 }
2096 sig = signature(function) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2097 globalns, localns = self._types_namespace 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2098 type_hints = _typing_extra.get_function_type_hints(function, globalns=globalns, localns=localns) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2100 parameters_list: list[core_schema.ArgumentsV3Parameter] = [] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2102 for i, (name, p) in enumerate(sig.parameters.items()): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2103 if parameters_callback is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2104 result = parameters_callback(i, name, p.annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2105 if result == 'skip': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2106 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2108 if p.annotation is Parameter.empty: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2109 annotation = typing.cast(Any, Any) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2110 else:
2111 annotation = type_hints[name] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2113 parameter_mode = mode_lookup.get(p.kind) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2114 if parameter_mode is None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2115 assert p.kind == Parameter.VAR_KEYWORD, p.kind 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2117 unpack_type = _typing_extra.unpack_type(annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2118 if unpack_type is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2119 origin = get_origin(unpack_type) or unpack_type 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2120 if not is_typeddict(origin): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2121 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2122 f'Expected a `TypedDict` class inside `Unpack[...]`, got {unpack_type!r}',
2123 code='unpack-typed-dict',
2124 )
2125 non_pos_only_param_names = { 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2126 name for name, p in sig.parameters.items() if p.kind != Parameter.POSITIONAL_ONLY
2127 }
2128 overlapping_params = non_pos_only_param_names.intersection(origin.__annotations__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2129 if overlapping_params: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2130 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2131 f'Typed dictionary {origin.__name__!r} overlaps with parameter'
2132 f'{"s" if len(overlapping_params) >= 2 else ""} '
2133 f'{", ".join(repr(p) for p in sorted(overlapping_params))}',
2134 code='overlapping-unpack-typed-dict',
2135 )
2136 parameter_mode = 'var_kwargs_unpacked_typed_dict' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2137 annotation = unpack_type 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2138 else:
2139 parameter_mode = 'var_kwargs_uniform' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2141 parameters_list.append( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2142 self._generate_parameter_v3_schema(
2143 name, annotation, AnnotationSource.FUNCTION, parameter_mode, default=p.default
2144 )
2145 )
2147 return core_schema.arguments_v3_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2148 parameters_list,
2149 validate_by_name=self._config_wrapper.validate_by_name,
2150 )
2152 def _unsubstituted_typevar_schema(self, typevar: typing.TypeVar) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2153 try: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2154 has_default = typevar.has_default() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2155 except AttributeError: 1abdjoquwkeghlmrsyzOJKLMNcifnptvA
2156 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13
2157 pass 1abdjoquwkeghlmrsyzOJKLMNcifnptvA
2158 else:
2159 if has_default: 1abdjoquwxCDkeghlmrsyzEFGPcifnptvABHI
2160 return self.generate_schema(typevar.__default__) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2162 if constraints := typevar.__constraints__: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2163 return self._union_schema(typing.Union[constraints]) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2165 if bound := typevar.__bound__: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2166 schema = self.generate_schema(bound) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2167 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2168 lambda x, h: h(x),
2169 schema=core_schema.any_schema(),
2170 )
2171 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2173 return core_schema.any_schema() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2175 def _computed_field_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2176 self,
2177 d: Decorator[ComputedFieldInfo],
2178 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]],
2179 ) -> core_schema.ComputedField:
2180 if d.info.return_type is not PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2181 return_type = d.info.return_type 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2182 else:
2183 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2184 # Do not pass in globals as the function could be defined in a different module.
2185 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2186 # in locals that may contain a parent/rebuild namespace:
2187 return_type = _decorators.get_callable_return_type(d.func, localns=self._types_namespace.locals) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2188 except NameError as e: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2189 raise PydanticUndefinedAnnotation.from_name_error(e) from e 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2190 if return_type is PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2191 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2192 'Computed field is missing return type annotation or specifying `return_type`'
2193 ' to the `@computed_field` decorator (e.g. `@computed_field(return_type=int | str)`)',
2194 code='model-field-missing-annotation',
2195 )
2197 return_type = replace_types(return_type, self._typevars_map) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2198 # Create a new ComputedFieldInfo so that different type parametrizations of the same
2199 # generic model's computed field can have different return types.
2200 d.info = dataclasses.replace(d.info, return_type=return_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2201 return_type_schema = self.generate_schema(return_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2202 # Apply serializers to computed field if there exist
2203 return_type_schema = self._apply_field_serializers( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2204 return_type_schema,
2205 filter_field_decorator_info_by_field(field_serializers.values(), d.cls_var_name),
2206 )
2208 alias_generator = self._config_wrapper.alias_generator 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2209 if alias_generator is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2210 self._apply_alias_generator_to_computed_field_info( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2211 alias_generator=alias_generator, computed_field_info=d.info, computed_field_name=d.cls_var_name
2212 )
2213 self._apply_field_title_generator_to_field_info(self._config_wrapper, d.info, d.cls_var_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2215 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(d.info) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2216 core_metadata: dict[str, Any] = {} 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2217 update_core_metadata( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2218 core_metadata,
2219 pydantic_js_updates={'readOnly': True, **(pydantic_js_updates if pydantic_js_updates else {})},
2220 pydantic_js_extra=pydantic_js_extra,
2221 )
2222 return core_schema.computed_field( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2223 d.cls_var_name, return_schema=return_type_schema, alias=d.info.alias, metadata=core_metadata
2224 )
2226 def _annotated_schema(self, annotated_type: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2227 """Generate schema for an Annotated type, e.g. `Annotated[int, Field(...)]` or `Annotated[int, Gt(0)]`."""
2228 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2229 source_type, *annotations = self._get_args_resolving_forward_refs( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2230 annotated_type,
2231 required=True,
2232 )
2233 schema = self._apply_annotations(source_type, annotations) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2234 # put the default validator last so that TypeAdapter.get_default_value() works
2235 # even if there are function validators involved
2236 for annotation in annotations: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2237 if isinstance(annotation, FieldInfo): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2238 schema = wrap_default(annotation, schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2239 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2241 def _apply_annotations( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2242 self,
2243 source_type: Any,
2244 annotations: list[Any],
2245 transform_inner_schema: Callable[[CoreSchema], CoreSchema] = lambda x: x,
2246 ) -> CoreSchema:
2247 """Apply arguments from `Annotated` or from `FieldInfo` to a schema.
2249 This gets called by `GenerateSchema._annotated_schema` but differs from it in that it does
2250 not expect `source_type` to be an `Annotated` object, it expects it to be the first argument of that
2251 (in other words, `GenerateSchema._annotated_schema` just unpacks `Annotated`, this process it).
2252 """
2253 annotations = list(_known_annotated_metadata.expand_grouped_metadata(annotations)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2255 pydantic_js_annotation_functions: list[GetJsonSchemaFunction] = [] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2257 def inner_handler(obj: Any) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2258 schema = self._generate_schema_from_get_schema_method(obj, source_type) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2260 if schema is None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2261 schema = self._generate_schema_inner(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2263 metadata_js_function = _extract_get_pydantic_json_schema(obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2264 if metadata_js_function is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2265 metadata_schema = resolve_original_schema(schema, self.defs) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2266 if metadata_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2267 self._add_js_function(metadata_schema, metadata_js_function) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2268 return transform_inner_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2270 get_inner_schema = CallbackGetCoreSchemaHandler(inner_handler, self) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2272 for annotation in annotations: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2273 if annotation is None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2274 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2275 get_inner_schema = self._get_wrapped_inner_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2276 get_inner_schema, annotation, pydantic_js_annotation_functions
2277 )
2279 schema = get_inner_schema(source_type) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2280 if pydantic_js_annotation_functions: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2281 core_metadata = schema.setdefault('metadata', {}) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2282 update_core_metadata(core_metadata, pydantic_js_annotation_functions=pydantic_js_annotation_functions) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2283 return _add_custom_serialization_from_json_encoders(self._config_wrapper.json_encoders, source_type, schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2285 def _apply_single_annotation(self, schema: core_schema.CoreSchema, metadata: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2286 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2288 if isinstance(metadata, FieldInfo): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2289 for field_metadata in metadata.metadata: 2289 ↛ 2290line 2289 didn't jump to line 2290 because the loop on line 2289 never started1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2290 schema = self._apply_single_annotation(schema, field_metadata)
2292 if metadata.discriminator is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2293 schema = self._apply_discriminator_to_union(schema, metadata.discriminator) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2294 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2296 if schema['type'] == 'nullable': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2297 # for nullable schemas, metadata is automatically applied to the inner schema
2298 inner = schema.get('schema', core_schema.any_schema()) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2299 inner = self._apply_single_annotation(inner, metadata) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2300 if inner: 2300 ↛ 2302line 2300 didn't jump to line 2302 because the condition on line 2300 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2301 schema['schema'] = inner 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2302 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2304 original_schema = schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2305 ref = schema.get('ref') 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2306 if ref is not None: 2306 ↛ 2307line 2306 didn't jump to line 2307 because the condition on line 2306 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2307 schema = schema.copy()
2308 new_ref = ref + f'_{repr(metadata)}'
2309 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None:
2310 return existing
2311 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues]
2312 elif schema['type'] == 'definition-ref': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2313 ref = schema['schema_ref'] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2314 if (referenced_schema := self.defs.get_schema_from_ref(ref)) is not None: 2314 ↛ 2321line 2314 didn't jump to line 2321 because the condition on line 2314 was always true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2315 schema = referenced_schema.copy() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2316 new_ref = ref + f'_{repr(metadata)}' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2317 if (existing := self.defs.get_schema_from_ref(new_ref)) is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2318 return existing 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2319 schema['ref'] = new_ref # pyright: ignore[reportGeneralTypeIssues] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2321 maybe_updated_schema = _known_annotated_metadata.apply_known_metadata(metadata, schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2323 if maybe_updated_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2324 return maybe_updated_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2325 return original_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2327 def _apply_single_annotation_json_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2328 self, schema: core_schema.CoreSchema, metadata: Any
2329 ) -> core_schema.CoreSchema:
2330 FieldInfo = import_cached_field_info() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2332 if isinstance(metadata, FieldInfo): 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2333 for field_metadata in metadata.metadata: 2333 ↛ 2334line 2333 didn't jump to line 2334 because the loop on line 2333 never started1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2334 schema = self._apply_single_annotation_json_schema(schema, field_metadata)
2336 pydantic_js_updates, pydantic_js_extra = _extract_json_schema_info_from_field_info(metadata) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2337 core_metadata = schema.setdefault('metadata', {}) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2338 update_core_metadata( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2339 core_metadata, pydantic_js_updates=pydantic_js_updates, pydantic_js_extra=pydantic_js_extra
2340 )
2341 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2343 def _get_wrapped_inner_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2344 self,
2345 get_inner_schema: GetCoreSchemaHandler,
2346 annotation: Any,
2347 pydantic_js_annotation_functions: list[GetJsonSchemaFunction],
2348 ) -> CallbackGetCoreSchemaHandler:
2349 annotation_get_schema: GetCoreSchemaFunction | None = getattr(annotation, '__get_pydantic_core_schema__', None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2351 def new_handler(source: Any) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2352 if annotation_get_schema is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2353 schema = annotation_get_schema(source, get_inner_schema) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2354 else:
2355 schema = get_inner_schema(source) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2356 schema = self._apply_single_annotation(schema, annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2357 schema = self._apply_single_annotation_json_schema(schema, annotation) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2359 metadata_js_function = _extract_get_pydantic_json_schema(annotation) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2360 if metadata_js_function is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2361 pydantic_js_annotation_functions.append(metadata_js_function) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2362 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2364 return CallbackGetCoreSchemaHandler(new_handler, self) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2366 def _apply_field_serializers( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2367 self,
2368 schema: core_schema.CoreSchema,
2369 serializers: list[Decorator[FieldSerializerDecoratorInfo]],
2370 ) -> core_schema.CoreSchema:
2371 """Apply field serializers to a schema."""
2372 if serializers: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2373 schema = copy(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2374 if schema['type'] == 'definitions': 2374 ↛ 2375line 2374 didn't jump to line 2375 because the condition on line 2374 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2375 inner_schema = schema['schema']
2376 schema['schema'] = self._apply_field_serializers(inner_schema, serializers)
2377 return schema
2378 elif 'ref' in schema: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2379 schema = self.defs.create_definition_reference_schema(schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2381 # use the last serializer to make it easy to override a serializer set on a parent model
2382 serializer = serializers[-1] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2383 is_field_serializer, info_arg = inspect_field_serializer(serializer.func, serializer.info.mode) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2385 if serializer.info.return_type is not PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2386 return_type = serializer.info.return_type 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2387 else:
2388 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2389 # Do not pass in globals as the function could be defined in a different module.
2390 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2391 # in locals that may contain a parent/rebuild namespace:
2392 return_type = _decorators.get_callable_return_type( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2393 serializer.func, localns=self._types_namespace.locals
2394 )
2395 except NameError as e: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2396 raise PydanticUndefinedAnnotation.from_name_error(e) from e 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2398 if return_type is PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2399 return_schema = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2400 else:
2401 return_schema = self.generate_schema(return_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2403 if serializer.info.mode == 'wrap': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2404 schema['serialization'] = core_schema.wrap_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2405 serializer.func,
2406 is_field_serializer=is_field_serializer,
2407 info_arg=info_arg,
2408 return_schema=return_schema,
2409 when_used=serializer.info.when_used,
2410 )
2411 else:
2412 assert serializer.info.mode == 'plain' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2413 schema['serialization'] = core_schema.plain_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2414 serializer.func,
2415 is_field_serializer=is_field_serializer,
2416 info_arg=info_arg,
2417 return_schema=return_schema,
2418 when_used=serializer.info.when_used,
2419 )
2420 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2422 def _apply_model_serializers( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2423 self, schema: core_schema.CoreSchema, serializers: Iterable[Decorator[ModelSerializerDecoratorInfo]]
2424 ) -> core_schema.CoreSchema:
2425 """Apply model serializers to a schema."""
2426 ref: str | None = schema.pop('ref', None) # type: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2427 if serializers: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2428 serializer = list(serializers)[-1] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2429 info_arg = inspect_model_serializer(serializer.func, serializer.info.mode) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2431 if serializer.info.return_type is not PydanticUndefined: 2431 ↛ 2432line 2431 didn't jump to line 2432 because the condition on line 2431 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2432 return_type = serializer.info.return_type
2433 else:
2434 try: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2435 # Do not pass in globals as the function could be defined in a different module.
2436 # Instead, let `get_callable_return_type` infer the globals to use, but still pass
2437 # in locals that may contain a parent/rebuild namespace:
2438 return_type = _decorators.get_callable_return_type( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2439 serializer.func, localns=self._types_namespace.locals
2440 )
2441 except NameError as e:
2442 raise PydanticUndefinedAnnotation.from_name_error(e) from e
2444 if return_type is PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2445 return_schema = None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2446 else:
2447 return_schema = self.generate_schema(return_type) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2449 if serializer.info.mode == 'wrap': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2450 ser_schema: core_schema.SerSchema = core_schema.wrap_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2451 serializer.func,
2452 info_arg=info_arg,
2453 return_schema=return_schema,
2454 when_used=serializer.info.when_used,
2455 )
2456 else:
2457 # plain
2458 ser_schema = core_schema.plain_serializer_function_ser_schema( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2459 serializer.func,
2460 info_arg=info_arg,
2461 return_schema=return_schema,
2462 when_used=serializer.info.when_used,
2463 )
2464 schema['serialization'] = ser_schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2465 if ref: 2465 ↛ 2467line 2465 didn't jump to line 2467 because the condition on line 2465 was always true1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2466 schema['ref'] = ref # type: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2467 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2470_VALIDATOR_F_MATCH: Mapping[ 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2471 tuple[FieldValidatorModes, Literal['no-info', 'with-info']],
2472 Callable[[Callable[..., Any], core_schema.CoreSchema, str | None], core_schema.CoreSchema],
2473] = {
2474 ('before', 'no-info'): lambda f, schema, _: core_schema.no_info_before_validator_function(f, schema),
2475 ('after', 'no-info'): lambda f, schema, _: core_schema.no_info_after_validator_function(f, schema),
2476 ('plain', 'no-info'): lambda f, _1, _2: core_schema.no_info_plain_validator_function(f),
2477 ('wrap', 'no-info'): lambda f, schema, _: core_schema.no_info_wrap_validator_function(f, schema),
2478 ('before', 'with-info'): lambda f, schema, field_name: core_schema.with_info_before_validator_function(
2479 f, schema, field_name=field_name
2480 ),
2481 ('after', 'with-info'): lambda f, schema, field_name: core_schema.with_info_after_validator_function(
2482 f, schema, field_name=field_name
2483 ),
2484 ('plain', 'with-info'): lambda f, _, field_name: core_schema.with_info_plain_validator_function(
2485 f, field_name=field_name
2486 ),
2487 ('wrap', 'with-info'): lambda f, schema, field_name: core_schema.with_info_wrap_validator_function(
2488 f, schema, field_name=field_name
2489 ),
2490}
2493# TODO V3: this function is only used for deprecated decorators. It should
2494# be removed once we drop support for those.
2495def apply_validators( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2496 schema: core_schema.CoreSchema,
2497 validators: Iterable[Decorator[RootValidatorDecoratorInfo]]
2498 | Iterable[Decorator[ValidatorDecoratorInfo]]
2499 | Iterable[Decorator[FieldValidatorDecoratorInfo]],
2500 field_name: str | None,
2501) -> core_schema.CoreSchema:
2502 """Apply validators to a schema.
2504 Args:
2505 schema: The schema to apply validators on.
2506 validators: An iterable of validators.
2507 field_name: The name of the field if validators are being applied to a model field.
2509 Returns:
2510 The updated schema.
2511 """
2512 for validator in validators: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2513 info_arg = inspect_validator(validator.func, validator.info.mode) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2514 val_type = 'with-info' if info_arg else 'no-info' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2516 schema = _VALIDATOR_F_MATCH[(validator.info.mode, val_type)](validator.func, schema, field_name) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2517 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2520def _validators_require_validate_default(validators: Iterable[Decorator[ValidatorDecoratorInfo]]) -> bool: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2521 """In v1, if any of the validators for a field had `always=True`, the default value would be validated.
2523 This serves as an auxiliary function for re-implementing that logic, by looping over a provided
2524 collection of (v1-style) ValidatorDecoratorInfo's and checking if any of them have `always=True`.
2526 We should be able to drop this function and the associated logic calling it once we drop support
2527 for v1-style validator decorators. (Or we can extend it and keep it if we add something equivalent
2528 to the v1-validator `always` kwarg to `field_validator`.)
2529 """
2530 for validator in validators: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2531 if validator.info.always: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2532 return True 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2533 return False 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2536def apply_model_validators( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2537 schema: core_schema.CoreSchema,
2538 validators: Iterable[Decorator[ModelValidatorDecoratorInfo]],
2539 mode: Literal['inner', 'outer', 'all'],
2540) -> core_schema.CoreSchema:
2541 """Apply model validators to a schema.
2543 If mode == 'inner', only "before" validators are applied
2544 If mode == 'outer', validators other than "before" are applied
2545 If mode == 'all', all validators are applied
2547 Args:
2548 schema: The schema to apply validators on.
2549 validators: An iterable of validators.
2550 mode: The validator mode.
2552 Returns:
2553 The updated schema.
2554 """
2555 ref: str | None = schema.pop('ref', None) # type: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2556 for validator in validators: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2557 if mode == 'inner' and validator.info.mode != 'before': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2558 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2559 if mode == 'outer' and validator.info.mode == 'before': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2560 continue 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2561 info_arg = inspect_validator(validator.func, validator.info.mode) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2562 if validator.info.mode == 'wrap': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2563 if info_arg: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2564 schema = core_schema.with_info_wrap_validator_function(function=validator.func, schema=schema) 1uwxCDyzEFGvABHI
2565 else:
2566 schema = core_schema.no_info_wrap_validator_function(function=validator.func, schema=schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2567 elif validator.info.mode == 'before': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2568 if info_arg: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2569 schema = core_schema.with_info_before_validator_function(function=validator.func, schema=schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2570 else:
2571 schema = core_schema.no_info_before_validator_function(function=validator.func, schema=schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2572 else:
2573 assert validator.info.mode == 'after' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2574 if info_arg: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2575 schema = core_schema.with_info_after_validator_function(function=validator.func, schema=schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2576 else:
2577 schema = core_schema.no_info_after_validator_function(function=validator.func, schema=schema) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2578 if ref: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2579 schema['ref'] = ref # type: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2580 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2583def wrap_default(field_info: FieldInfo, schema: core_schema.CoreSchema) -> core_schema.CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2584 """Wrap schema with default schema if default value or `default_factory` are available.
2586 Args:
2587 field_info: The field info object.
2588 schema: The schema to apply default on.
2590 Returns:
2591 Updated schema by default value or `default_factory`.
2592 """
2593 if field_info.default_factory: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2594 return core_schema.with_default_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2595 schema,
2596 default_factory=field_info.default_factory,
2597 default_factory_takes_data=takes_validated_data_argument(field_info.default_factory),
2598 validate_default=field_info.validate_default,
2599 )
2600 elif field_info.default is not PydanticUndefined: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2601 return core_schema.with_default_schema( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2602 schema, default=field_info.default, validate_default=field_info.validate_default
2603 )
2604 else:
2605 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2608def _extract_get_pydantic_json_schema(tp: Any) -> GetJsonSchemaFunction | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2609 """Extract `__get_pydantic_json_schema__` from a type, handling the deprecated `__modify_schema__`."""
2610 js_modify_function = getattr(tp, '__get_pydantic_json_schema__', None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2612 if hasattr(tp, '__modify_schema__'): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2613 BaseModel = import_cached_base_model() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2615 has_custom_v2_modify_js_func = ( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2616 js_modify_function is not None
2617 and BaseModel.__get_pydantic_json_schema__.__func__ # type: ignore
2618 not in (js_modify_function, getattr(js_modify_function, '__func__', None))
2619 )
2621 if not has_custom_v2_modify_js_func: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2622 cls_name = getattr(tp, '__name__', None) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2623 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2624 f'The `__modify_schema__` method is not supported in Pydantic v2. '
2625 f'Use `__get_pydantic_json_schema__` instead{f" in class `{cls_name}`" if cls_name else ""}.',
2626 code='custom-json-schema',
2627 )
2629 if (origin := get_origin(tp)) is not None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2630 # Generic aliases proxy attribute access to the origin, *except* dunder attributes,
2631 # such as `__get_pydantic_json_schema__`, hence the explicit check.
2632 return _extract_get_pydantic_json_schema(origin) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2634 if js_modify_function is None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2635 return None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2637 return js_modify_function 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2640class _CommonField(TypedDict): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2641 schema: core_schema.CoreSchema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2642 validation_alias: str | list[str | int] | list[list[str | int]] | None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2643 serialization_alias: str | None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2644 serialization_exclude: bool | None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2645 frozen: bool | None 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2646 metadata: dict[str, Any] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2649def _common_field( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2650 schema: core_schema.CoreSchema,
2651 *,
2652 validation_alias: str | list[str | int] | list[list[str | int]] | None = None,
2653 serialization_alias: str | None = None,
2654 serialization_exclude: bool | None = None,
2655 frozen: bool | None = None,
2656 metadata: Any = None,
2657) -> _CommonField:
2658 return { 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2659 'schema': schema,
2660 'validation_alias': validation_alias,
2661 'serialization_alias': serialization_alias,
2662 'serialization_exclude': serialization_exclude,
2663 'frozen': frozen,
2664 'metadata': metadata,
2665 }
2668def resolve_original_schema(schema: CoreSchema, definitions: _Definitions) -> CoreSchema | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2669 if schema['type'] == 'definition-ref': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2670 return definitions.get_schema_from_ref(schema['schema_ref']) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2671 elif schema['type'] == 'definitions': 2671 ↛ 2672line 2671 didn't jump to line 2672 because the condition on line 2671 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2672 return schema['schema']
2673 else:
2674 return schema 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2677def _inlining_behavior( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2678 def_ref: core_schema.DefinitionReferenceSchema,
2679) -> Literal['inline', 'keep', 'preserve_metadata']:
2680 """Determine the inlining behavior of the `'definition-ref'` schema.
2682 - If no `'serialization'` schema and no metadata is attached, the schema can safely be inlined.
2683 - If it has metadata but only related to the deferred discriminator application, it can be inlined
2684 provided that such metadata is kept.
2685 - Otherwise, the schema should not be inlined. Doing so would remove the `'serialization'` schema or metadata.
2686 """
2687 if 'serialization' in def_ref: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2688 return 'keep' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2689 metadata = def_ref.get('metadata') 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2690 if not metadata: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2691 return 'inline' 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2692 if len(metadata) == 1 and 'pydantic_internal_union_discriminator' in metadata: 2692 ↛ 2693line 2692 didn't jump to line 2693 because the condition on line 2692 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2693 return 'preserve_metadata'
2694 return 'keep' 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2697class _Definitions: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2698 """Keeps track of references and definitions."""
2700 _recursively_seen: set[str] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2701 """A set of recursively seen references. 1djoquwxCDelmrsyzEFGJKLMNPfnptvABHI
2703 When a referenceable type is encountered, the `get_schema_or_ref` context manager is
2704 entered to compute the reference. If the type references itself by some way (e.g. for
2705 a dataclass a Pydantic model, the class can be referenced as a field annotation),
2706 entering the context manager again will yield a `'definition-ref'` schema that should
2707 short-circuit the normal generation process, as the reference was already in this set.
2708 """
2710 _definitions: dict[str, core_schema.CoreSchema] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2711 """A mapping of references to their corresponding schema. 1djoquwxCDelmrsyzEFGJKLMNPfnptvABHI
2713 When a schema for a referenceable type is generated, it is stored in this mapping. If the
2714 same type is encountered again, the reference is yielded by the `get_schema_or_ref` context
2715 manager.
2716 """
2718 def __init__(self) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2719 self._recursively_seen = set() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2720 self._definitions = {} 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2722 @contextmanager 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2723 def get_schema_or_ref(self, tp: Any, /) -> Generator[tuple[str, core_schema.DefinitionReferenceSchema | None]]: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2724 """Get a definition for `tp` if one exists.
2726 If a definition exists, a tuple of `(ref_string, CoreSchema)` is returned.
2727 If no definition exists yet, a tuple of `(ref_string, None)` is returned.
2729 Note that the returned `CoreSchema` will always be a `DefinitionReferenceSchema`,
2730 not the actual definition itself.
2732 This should be called for any type that can be identified by reference.
2733 This includes any recursive types.
2735 At present the following types can be named/recursive:
2737 - Pydantic model
2738 - Pydantic and stdlib dataclasses
2739 - Typed dictionaries
2740 - Named tuples
2741 - `TypeAliasType` instances
2742 - Enums
2743 """
2744 ref = get_type_ref(tp) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2745 # return the reference if we're either (1) in a cycle or (2) it the reference was already encountered:
2746 if ref in self._recursively_seen or ref in self._definitions: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2747 yield (ref, core_schema.definition_reference_schema(ref)) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2748 else:
2749 self._recursively_seen.add(ref) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2750 try: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2751 yield (ref, None) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2752 finally:
2753 self._recursively_seen.discard(ref) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2755 def get_schema_from_ref(self, ref: str) -> CoreSchema | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2756 """Resolve the schema from the given reference."""
2757 return self._definitions.get(ref) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2759 def create_definition_reference_schema(self, schema: CoreSchema) -> core_schema.DefinitionReferenceSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2760 """Store the schema as a definition and return a `'definition-reference'` schema pointing to it.
2762 The schema must have a reference attached to it.
2763 """
2764 ref = schema['ref'] # pyright: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2765 self._definitions[ref] = schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2766 return core_schema.definition_reference_schema(ref) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2768 def unpack_definitions(self, schema: core_schema.DefinitionsSchema) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2769 """Store the definitions of the `'definitions'` core schema and return the inner core schema."""
2770 for def_schema in schema['definitions']: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2771 self._definitions[def_schema['ref']] = def_schema # pyright: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2772 return schema['schema'] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2774 def finalize_schema(self, schema: CoreSchema) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2775 """Finalize the core schema.
2777 This traverses the core schema and referenced definitions, replaces `'definition-ref'` schemas
2778 by the referenced definition if possible, and applies deferred discriminators.
2779 """
2780 definitions = self._definitions 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2781 try: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2782 gather_result = gather_schemas_for_cleaning( 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2783 schema,
2784 definitions=definitions,
2785 )
2786 except MissingDefinitionError as e: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2787 raise InvalidSchemaError from e 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2789 remaining_defs: dict[str, CoreSchema] = {} 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2791 # Note: this logic doesn't play well when core schemas with deferred discriminator metadata
2792 # and references are encountered. See the `test_deferred_discriminated_union_and_references()` test.
2793 for ref, inlinable_def_ref in gather_result['collected_references'].items(): 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2794 if inlinable_def_ref is not None and (inlining_behavior := _inlining_behavior(inlinable_def_ref)) != 'keep': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2795 if inlining_behavior == 'inline': 2795 ↛ 2802line 2795 didn't jump to line 2802 because the condition on line 2795 was always true1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2796 # `ref` was encountered, and only once:
2797 # - `inlinable_def_ref` is a `'definition-ref'` schema and is guaranteed to be
2798 # the only one. Transform it into the definition it points to.
2799 # - Do not store the definition in the `remaining_defs`.
2800 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2801 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2802 elif inlining_behavior == 'preserve_metadata':
2803 # `ref` was encountered, and only once, but contains discriminator metadata.
2804 # We will do the same thing as if `inlining_behavior` was `'inline'`, but make
2805 # sure to keep the metadata for the deferred discriminator application logic below.
2806 meta = inlinable_def_ref.pop('metadata')
2807 inlinable_def_ref.clear() # pyright: ignore[reportAttributeAccessIssue]
2808 inlinable_def_ref.update(self._resolve_definition(ref, definitions)) # pyright: ignore
2809 inlinable_def_ref['metadata'] = meta
2810 else:
2811 # `ref` was encountered, at least two times (or only once, but with metadata or a serialization schema):
2812 # - Do not inline the `'definition-ref'` schemas (they are not provided in the gather result anyway).
2813 # - Store the the definition in the `remaining_defs`
2814 remaining_defs[ref] = self._resolve_definition(ref, definitions) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2816 for cs in gather_result['deferred_discriminator_schemas']: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2817 discriminator: str | None = cs['metadata'].pop('pydantic_internal_union_discriminator', None) # pyright: ignore[reportTypedDictNotRequiredAccess] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2818 if discriminator is None: 2818 ↛ 2822line 2818 didn't jump to line 2822 because the condition on line 2818 was never true1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2819 # This can happen in rare scenarios, when a deferred schema is present multiple times in the
2820 # gather result (e.g. when using the `Sequence` type -- see `test_sequence_discriminated_union()`).
2821 # In this case, a previous loop iteration applied the discriminator and so we can just skip it here.
2822 continue
2823 applied = _discriminated_union.apply_discriminator(cs.copy(), discriminator, remaining_defs) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2824 # Mutate the schema directly to have the discriminator applied
2825 cs.clear() # pyright: ignore[reportAttributeAccessIssue] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2826 cs.update(applied) # pyright: ignore 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2828 if remaining_defs: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2829 schema = core_schema.definitions_schema(schema=schema, definitions=[*remaining_defs.values()]) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2830 return schema 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2832 def _resolve_definition(self, ref: str, definitions: dict[str, CoreSchema]) -> CoreSchema: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2833 definition = definitions[ref] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2834 if definition['type'] != 'definition-ref': 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2835 return definition 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2837 # Some `'definition-ref'` schemas might act as "intermediate" references (e.g. when using
2838 # a PEP 695 type alias (which is referenceable) that references another PEP 695 type alias):
2839 visited: set[str] = set() 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2840 while definition['type'] == 'definition-ref' and _inlining_behavior(definition) == 'inline': 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2841 schema_ref = definition['schema_ref'] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2842 if schema_ref in visited: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2843 raise PydanticUserError( 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2844 f'{ref} contains a circular reference to itself.', code='circular-reference-schema'
2845 )
2846 visited.add(schema_ref) 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2847 definition = definitions[schema_ref] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2848 return {**definition, 'ref': ref} # pyright: ignore[reportReturnType] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2851class _FieldNameStack: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2852 __slots__ = ('_stack',) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2854 def __init__(self) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2855 self._stack: list[str] = [] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2857 @contextmanager 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2858 def push(self, field_name: str) -> Iterator[None]: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2859 self._stack.append(field_name) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2860 yield 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2861 self._stack.pop() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2863 def get(self) -> str | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2864 if self._stack: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2865 return self._stack[-1] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2866 else:
2867 return None 1adouxcfpvB
2870class _ModelTypeStack: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2871 __slots__ = ('_stack',) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2873 def __init__(self) -> None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2874 self._stack: list[type] = [] 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2876 @contextmanager 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2877 def push(self, type_obj: type) -> Iterator[None]: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2878 self._stack.append(type_obj) 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2879 yield 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2880 self._stack.pop() 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2882 def get(self) -> type | None: 1abdjoquwxCDkeghlmrsyzEFGOJKLMNPcifnptvABHI
2883 if self._stack: 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2884 return self._stack[-1] 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI
2885 else:
2886 return None 1abdjoquwxCDkeghlmrsyzEFGcifnptvABHI