Coverage for pydantic/_internal/_core_utils.py: 96.49%
367 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-03 19:29 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-03 19:29 +0000
1from __future__ import annotations 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
3import os 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
4from collections import defaultdict 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
5from typing import ( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
6 Any,
7 Callable,
8 Hashable,
9 TypeVar,
10 Union,
11)
13from pydantic_core import CoreSchema, core_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
14from pydantic_core import validate_core_schema as _validate_core_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
15from typing_extensions import TypeAliasType, TypeGuard, get_args, get_origin 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
17from . import _repr 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
18from ._typing_extra import is_generic_alias 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
20AnyFunctionSchema = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
21 core_schema.AfterValidatorFunctionSchema,
22 core_schema.BeforeValidatorFunctionSchema,
23 core_schema.WrapValidatorFunctionSchema,
24 core_schema.PlainValidatorFunctionSchema,
25]
28FunctionSchemaWithInnerSchema = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
29 core_schema.AfterValidatorFunctionSchema,
30 core_schema.BeforeValidatorFunctionSchema,
31 core_schema.WrapValidatorFunctionSchema,
32]
34CoreSchemaField = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
35 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField
36]
37CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
39_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
40_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
41_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
43TAGGED_UNION_TAG_KEY = 'pydantic.internal.tagged_union_tag' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
44""" 1abcdefghyijklmnopMNOPQRSTqrstuvwx
45Used in a `Tag` schema to specify the tag used for a discriminated union.
46"""
47HAS_INVALID_SCHEMAS_METADATA_KEY = 'pydantic.internal.invalid' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
48"""Used to mark a schema that is invalid because it refers to a definition that was not yet defined when the 1abcdefghyijklmnopMNOPQRSTqrstuvwx
49schema was first encountered.
50"""
53def is_core_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
54 schema: CoreSchemaOrField,
55) -> TypeGuard[CoreSchema]:
56 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
59def is_core_schema_field( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
60 schema: CoreSchemaOrField,
61) -> TypeGuard[CoreSchemaField]:
62 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
65def is_function_with_inner_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
66 schema: CoreSchemaOrField,
67) -> TypeGuard[FunctionSchemaWithInnerSchema]:
68 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
71def is_list_like_schema_with_items_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
72 schema: CoreSchema,
73) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]:
74 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
77def get_type_ref(type_: type[Any], args_override: tuple[type[Any], ...] | None = None) -> str: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
78 """Produces the ref to be used for this type by pydantic_core's core schemas.
80 This `args_override` argument was added for the purpose of creating valid recursive references
81 when creating generic models without needing to create a concrete class.
82 """
83 origin = get_origin(type_) or type_ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
85 args = get_args(type_) if is_generic_alias(type_) else (args_override or ()) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
86 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
87 if generic_metadata: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
88 origin = generic_metadata['origin'] or origin 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
89 args = generic_metadata['args'] or args 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
91 module_name = getattr(origin, '__module__', '<No __module__>') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
92 if isinstance(origin, TypeAliasType): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
93 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}' 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
94 else:
95 try: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
96 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
97 except Exception:
98 qualname = getattr(origin, '__qualname__', '<No __qualname__>')
99 type_ref = f'{module_name}.{qualname}:{id(origin)}' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
101 arg_refs: list[str] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
102 for arg in args: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
103 if isinstance(arg, str): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
104 # Handle string literals as a special case; we may be able to remove this special handling if we
105 # wrap them in a ForwardRef at some point.
106 arg_ref = f'{arg}:str-{id(arg)}' 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
107 else:
108 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
109 arg_refs.append(arg_ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
110 if arg_refs: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
111 type_ref = f'{type_ref}[{",".join(arg_refs)}]' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
112 return type_ref 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
115def get_ref(s: core_schema.CoreSchema) -> None | str: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
116 """Get the ref from the schema if it has one.
117 This exists just for type checking to work correctly.
118 """
119 return s.get('ref', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
122def collect_definitions(schema: core_schema.CoreSchema) -> dict[str, core_schema.CoreSchema]: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
123 defs: dict[str, CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
125 def _record_valid_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
126 ref = get_ref(s) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
127 if ref: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
128 defs[ref] = s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
129 return recurse(s, _record_valid_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
131 walk_core_schema(schema, _record_valid_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
133 return defs 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
136def define_expected_missing_refs( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
137 schema: core_schema.CoreSchema, allowed_missing_refs: set[str]
138) -> core_schema.CoreSchema | None:
139 if not allowed_missing_refs: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
140 # in this case, there are no missing refs to potentially substitute, so there's no need to walk the schema
141 # this is a common case (will be hit for all non-generic models), so it's worth optimizing for
142 return None 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
144 refs = collect_definitions(schema).keys() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
146 expected_missing_refs = allowed_missing_refs.difference(refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
147 if expected_missing_refs: 147 ↛ 155line 147 didn't jump to line 155 because the condition on line 147 was always true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
148 definitions: list[core_schema.CoreSchema] = [ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
149 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail
150 # Issue: https://github.com/pydantic/pydantic-core/issues/619
151 core_schema.none_schema(ref=ref, metadata={HAS_INVALID_SCHEMAS_METADATA_KEY: True})
152 for ref in expected_missing_refs
153 ]
154 return core_schema.definitions_schema(schema, definitions) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
155 return None
158def collect_invalid_schemas(schema: core_schema.CoreSchema) -> bool: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
159 invalid = False 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
161 def _is_schema_valid(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
162 nonlocal invalid
163 if 'metadata' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
164 metadata = s['metadata'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
165 if HAS_INVALID_SCHEMAS_METADATA_KEY in metadata: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
166 invalid = metadata[HAS_INVALID_SCHEMAS_METADATA_KEY] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
167 return s 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
168 return recurse(s, _is_schema_valid) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
170 walk_core_schema(schema, _is_schema_valid) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
171 return invalid 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
174T = TypeVar('T') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
177Recurse = Callable[[core_schema.CoreSchema, 'Walk'], core_schema.CoreSchema] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
178Walk = Callable[[core_schema.CoreSchema, Recurse], core_schema.CoreSchema] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
180# TODO: Should we move _WalkCoreSchema into pydantic_core proper?
181# Issue: https://github.com/pydantic/pydantic-core/issues/615
184class _WalkCoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
185 def __init__(self): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
186 self._schema_type_to_method = self._build_schema_type_to_method() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
188 def _build_schema_type_to_method(self) -> dict[core_schema.CoreSchemaType, Recurse]: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
189 mapping: dict[core_schema.CoreSchemaType, Recurse] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
190 key: core_schema.CoreSchemaType
191 for key in get_args(core_schema.CoreSchemaType): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
192 method_name = f"handle_{key.replace('-', '_')}_schema" 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
193 mapping[key] = getattr(self, method_name, self._handle_other_schemas) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
194 return mapping 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
196 def walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
197 return f(schema, self._walk) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
199 def _walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
200 schema = self._schema_type_to_method[schema['type']](schema.copy(), f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
201 ser_schema: core_schema.SerSchema | None = schema.get('serialization') # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
202 if ser_schema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
203 schema['serialization'] = self._handle_ser_schemas(ser_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
204 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
206 def _handle_other_schemas(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
207 sub_schema = schema.get('schema', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
208 if sub_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
209 schema['schema'] = self.walk(sub_schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
210 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
212 def _handle_ser_schemas(self, ser_schema: core_schema.SerSchema, f: Walk) -> core_schema.SerSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
213 schema: core_schema.CoreSchema | None = ser_schema.get('schema', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
214 if schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
215 ser_schema['schema'] = self.walk(schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
216 return_schema: core_schema.CoreSchema | None = ser_schema.get('return_schema', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
217 if return_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
218 ser_schema['return_schema'] = self.walk(return_schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
219 return ser_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
221 def handle_definitions_schema(self, schema: core_schema.DefinitionsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
222 new_definitions: list[core_schema.CoreSchema] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
223 for definition in schema['definitions']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
224 if 'schema_ref' in definition and 'ref' in definition: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
225 # This indicates a purposely indirect reference
226 # We want to keep such references around for implications related to JSON schema, etc.:
227 new_definitions.append(definition) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
228 # However, we still need to walk the referenced definition:
229 self.walk(definition, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
230 continue 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
232 updated_definition = self.walk(definition, f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
233 if 'ref' in updated_definition: 233 ↛ 223line 233 didn't jump to line 223 because the condition on line 233 was always true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
234 # If the updated definition schema doesn't have a 'ref', it shouldn't go in the definitions
235 # This is most likely to happen due to replacing something with a definition reference, in
236 # which case it should certainly not go in the definitions list
237 new_definitions.append(updated_definition) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
238 new_inner_schema = self.walk(schema['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
240 if not new_definitions and len(schema) == 3: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
241 # This means we'd be returning a "trivial" definitions schema that just wrapped the inner schema
242 return new_inner_schema
244 new_schema = schema.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
245 new_schema['schema'] = new_inner_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
246 new_schema['definitions'] = new_definitions 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
247 return new_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
249 def handle_list_schema(self, schema: core_schema.ListSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
250 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
251 if items_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
252 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
253 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
255 def handle_set_schema(self, schema: core_schema.SetSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
256 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
257 if items_schema is not None: 257 ↛ 259line 257 didn't jump to line 259 because the condition on line 257 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
258 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
259 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
261 def handle_frozenset_schema(self, schema: core_schema.FrozenSetSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
262 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
263 if items_schema is not None: 263 ↛ 265line 263 didn't jump to line 265 because the condition on line 263 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
264 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
265 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
267 def handle_generator_schema(self, schema: core_schema.GeneratorSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
268 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
269 if items_schema is not None: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
270 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
271 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
273 def handle_tuple_schema(self, schema: core_schema.TupleSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
274 schema['items_schema'] = [self.walk(v, f) for v in schema['items_schema']] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
275 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
277 def handle_dict_schema(self, schema: core_schema.DictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
278 keys_schema = schema.get('keys_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
279 if keys_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
280 schema['keys_schema'] = self.walk(keys_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
281 values_schema = schema.get('values_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
282 if values_schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
283 schema['values_schema'] = self.walk(values_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
284 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
286 def handle_function_schema(self, schema: AnyFunctionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
287 if not is_function_with_inner_schema(schema): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
288 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
289 schema['schema'] = self.walk(schema['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
290 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
292 def handle_union_schema(self, schema: core_schema.UnionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
293 new_choices: list[CoreSchema | tuple[CoreSchema, str]] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
294 for v in schema['choices']: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
295 if isinstance(v, tuple): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
296 new_choices.append((self.walk(v[0], f), v[1])) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
297 else:
298 new_choices.append(self.walk(v, f)) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
299 schema['choices'] = new_choices 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
300 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
302 def handle_tagged_union_schema(self, schema: core_schema.TaggedUnionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
303 new_choices: dict[Hashable, core_schema.CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
304 for k, v in schema['choices'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
305 new_choices[k] = v if isinstance(v, (str, int)) else self.walk(v, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
306 schema['choices'] = new_choices 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
307 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
309 def handle_chain_schema(self, schema: core_schema.ChainSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
310 schema['steps'] = [self.walk(v, f) for v in schema['steps']] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
311 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
313 def handle_lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
314 schema['lax_schema'] = self.walk(schema['lax_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
315 schema['strict_schema'] = self.walk(schema['strict_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
316 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
318 def handle_json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
319 schema['json_schema'] = self.walk(schema['json_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
320 schema['python_schema'] = self.walk(schema['python_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
321 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
323 def handle_model_fields_schema(self, schema: core_schema.ModelFieldsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
324 extras_schema = schema.get('extras_schema') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
325 if extras_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
326 schema['extras_schema'] = self.walk(extras_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
327 replaced_fields: dict[str, core_schema.ModelField] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
328 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
329 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
330 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
331 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
332 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
333 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
334 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
335 for k, v in schema['fields'].items(): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
336 replaced_field = v.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
337 replaced_field['schema'] = self.walk(v['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
338 replaced_fields[k] = replaced_field 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
339 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
340 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
342 def handle_typed_dict_schema(self, schema: core_schema.TypedDictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
343 extras_schema = schema.get('extras_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
344 if extras_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
345 schema['extras_schema'] = self.walk(extras_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
346 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
347 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
348 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
349 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
350 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
351 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
352 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
353 replaced_fields: dict[str, core_schema.TypedDictField] = {} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
354 for k, v in schema['fields'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
355 replaced_field = v.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
356 replaced_field['schema'] = self.walk(v['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
357 replaced_fields[k] = replaced_field 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
358 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
359 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
361 def handle_dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
362 replaced_fields: list[core_schema.DataclassField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
363 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
364 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
365 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
366 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
367 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
368 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
369 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
370 for field in schema['fields']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
371 replaced_field = field.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
372 replaced_field['schema'] = self.walk(field['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
373 replaced_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
374 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
375 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
377 def handle_arguments_schema(self, schema: core_schema.ArgumentsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
378 replaced_arguments_schema: list[core_schema.ArgumentsParameter] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
379 for param in schema['arguments_schema']: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
380 replaced_param = param.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
381 replaced_param['schema'] = self.walk(param['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
382 replaced_arguments_schema.append(replaced_param) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
383 schema['arguments_schema'] = replaced_arguments_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
384 if 'var_args_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
385 schema['var_args_schema'] = self.walk(schema['var_args_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
386 if 'var_kwargs_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
387 schema['var_kwargs_schema'] = self.walk(schema['var_kwargs_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
388 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
390 def handle_call_schema(self, schema: core_schema.CallSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
391 schema['arguments_schema'] = self.walk(schema['arguments_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
392 if 'return_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
393 schema['return_schema'] = self.walk(schema['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
394 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
397_dispatch = _WalkCoreSchema().walk 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
400def walk_core_schema(schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
401 """Recursively traverse a CoreSchema.
403 Args:
404 schema (core_schema.CoreSchema): The CoreSchema to process, it will not be modified.
405 f (Walk): A function to apply. This function takes two arguments:
406 1. The current CoreSchema that is being processed
407 (not the same one you passed into this function, one level down).
408 2. The "next" `f` to call. This lets you for example use `f=functools.partial(some_method, some_context)`
409 to pass data down the recursive calls without using globals or other mutable state.
411 Returns:
412 core_schema.CoreSchema: A processed CoreSchema.
413 """
414 return f(schema.copy(), _dispatch) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
417def simplify_schema_references(schema: core_schema.CoreSchema) -> core_schema.CoreSchema: # noqa: C901 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
418 definitions: dict[str, core_schema.CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
419 ref_counts: dict[str, int] = defaultdict(int) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
420 involved_in_recursion: dict[str, bool] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
421 current_recursion_ref_count: dict[str, int] = defaultdict(int) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
423 def collect_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
424 if s['type'] == 'definitions': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
425 for definition in s['definitions']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
426 ref = get_ref(definition) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
427 assert ref is not None 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
428 if ref not in definitions: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
429 definitions[ref] = definition 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
430 recurse(definition, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
431 return recurse(s['schema'], collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
432 else:
433 ref = get_ref(s) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
434 if ref is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
435 new = recurse(s, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
436 new_ref = get_ref(new) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
437 if new_ref: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
438 definitions[new_ref] = new 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
439 return core_schema.definition_reference_schema(schema_ref=ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
440 else:
441 return recurse(s, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
443 schema = walk_core_schema(schema, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
445 def count_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
446 if s['type'] != 'definition-ref': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
447 return recurse(s, count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
448 ref = s['schema_ref'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
449 ref_counts[ref] += 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
451 if ref_counts[ref] >= 2: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
452 # If this model is involved in a recursion this should be detected
453 # on its second encounter, we can safely stop the walk here.
454 if current_recursion_ref_count[ref] != 0: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
455 involved_in_recursion[ref] = True 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
456 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
458 current_recursion_ref_count[ref] += 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
459 recurse(definitions[ref], count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
460 current_recursion_ref_count[ref] -= 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
461 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
463 schema = walk_core_schema(schema, count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
465 assert all(c == 0 for c in current_recursion_ref_count.values()), 'this is a bug! please report it' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
467 def can_be_inlined(s: core_schema.DefinitionReferenceSchema, ref: str) -> bool: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
468 if ref_counts[ref] > 1: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
469 return False 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
470 if involved_in_recursion.get(ref, False): 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
471 return False
472 if 'serialization' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
473 return False 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
474 if 'metadata' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
475 metadata = s['metadata'] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
476 for k in ( 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
477 'pydantic_js_functions',
478 'pydantic_js_annotation_functions',
479 'pydantic.internal.union_discriminator',
480 ):
481 if k in metadata: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
482 # we need to keep this as a ref
483 return False 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
484 return True 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
486 def inline_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
487 if s['type'] == 'definition-ref': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
488 ref = s['schema_ref'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
489 # Check if the reference is only used once, not involved in recursion and does not have
490 # any extra keys (like 'serialization')
491 if can_be_inlined(s, ref): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
492 # Inline the reference by replacing the reference with the actual schema
493 new = definitions.pop(ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
494 ref_counts[ref] -= 1 # because we just replaced it! 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
495 # put all other keys that were on the def-ref schema into the inlined version
496 # in particular this is needed for `serialization`
497 if 'serialization' in s: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
498 new['serialization'] = s['serialization']
499 s = recurse(new, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
500 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
501 else:
502 return recurse(s, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
503 else:
504 return recurse(s, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
506 schema = walk_core_schema(schema, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
508 def_values = [v for v in definitions.values() if ref_counts[v['ref']] > 0] # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
510 if def_values: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
511 schema = core_schema.definitions_schema(schema=schema, definitions=def_values) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
512 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
515def _strip_metadata(schema: CoreSchema) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
516 def strip_metadata(s: CoreSchema, recurse: Recurse) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
517 s = s.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
518 s.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
519 if s['type'] == 'model-fields': 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
520 s = s.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
521 s['fields'] = {k: v.copy() for k, v in s['fields'].items()} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
522 for field_name, field_schema in s['fields'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
523 field_schema.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
524 s['fields'][field_name] = field_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
525 computed_fields = s.get('computed_fields', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
526 if computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
527 s['computed_fields'] = [cf.copy() for cf in computed_fields] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
528 for cf in computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
529 cf.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
530 else:
531 s.pop('computed_fields', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
532 elif s['type'] == 'model': 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
533 # remove some defaults
534 if s.get('custom_init', True) is False: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
535 s.pop('custom_init') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
536 if s.get('root_model', True) is False: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
537 s.pop('root_model') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
538 if {'title'}.issuperset(s.get('config', {}).keys()): 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
539 s.pop('config', None)
541 return recurse(s, strip_metadata) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
543 return walk_core_schema(schema, strip_metadata) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
546def pretty_print_core_schema( 1zABCabcdefghDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
547 schema: CoreSchema,
548 include_metadata: bool = False,
549) -> None:
550 """Pretty print a CoreSchema using rich.
551 This is intended for debugging purposes.
553 Args:
554 schema: The CoreSchema to print.
555 include_metadata: Whether to include metadata in the output. Defaults to `False`.
556 """
557 from rich import print # type: ignore # install it manually in your dev env 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
559 if not include_metadata: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
560 schema = _strip_metadata(schema) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
562 return print(schema) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx
565def validate_core_schema(schema: CoreSchema) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
566 if 'PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS' in os.environ: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx
567 return schema
568 return _validate_core_schema(schema) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx