Coverage for pydantic/_internal/_core_utils.py: 96.49%
367 statements
« prev ^ index » next coverage.py v7.5.3, created at 2024-06-21 17:00 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2024-06-21 17:00 +0000
1from __future__ import annotations 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
3import os 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
4from collections import defaultdict 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
5from typing import ( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
6 Any,
7 Callable,
8 Hashable,
9 TypeVar,
10 Union,
11)
13from pydantic_core import CoreSchema, core_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
14from pydantic_core import validate_core_schema as _validate_core_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
15from typing_extensions import TypeAliasType, TypeGuard, get_args, get_origin 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
17from . import _repr 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
18from ._typing_extra import is_generic_alias 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
20AnyFunctionSchema = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
21 core_schema.AfterValidatorFunctionSchema,
22 core_schema.BeforeValidatorFunctionSchema,
23 core_schema.WrapValidatorFunctionSchema,
24 core_schema.PlainValidatorFunctionSchema,
25]
28FunctionSchemaWithInnerSchema = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
29 core_schema.AfterValidatorFunctionSchema,
30 core_schema.BeforeValidatorFunctionSchema,
31 core_schema.WrapValidatorFunctionSchema,
32]
34CoreSchemaField = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
35 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField
36]
37CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
39_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
40_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
41_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
43TAGGED_UNION_TAG_KEY = 'pydantic.internal.tagged_union_tag' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
44""" 1abcdefsghijklGHIJKLMmnopqr
45Used in a `Tag` schema to specify the tag used for a discriminated union.
46"""
47HAS_INVALID_SCHEMAS_METADATA_KEY = 'pydantic.internal.invalid' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
48"""Used to mark a schema that is invalid because it refers to a definition that was not yet defined when the 1abcdefsghijklGHIJKLMmnopqr
49schema was first encountered.
50"""
53def is_core_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
54 schema: CoreSchemaOrField,
55) -> TypeGuard[CoreSchema]:
56 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES 1tuvwabcdefFsxyzAghijklBCDEmnopqr
59def is_core_schema_field( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
60 schema: CoreSchemaOrField,
61) -> TypeGuard[CoreSchemaField]:
62 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES 1tuvwabcdefFsxyzAghijklBCDEmnopqr
65def is_function_with_inner_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
66 schema: CoreSchemaOrField,
67) -> TypeGuard[FunctionSchemaWithInnerSchema]:
68 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
71def is_list_like_schema_with_items_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
72 schema: CoreSchema,
73) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]:
74 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
77def get_type_ref(type_: type[Any], args_override: tuple[type[Any], ...] | None = None) -> str: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
78 """Produces the ref to be used for this type by pydantic_core's core schemas.
80 This `args_override` argument was added for the purpose of creating valid recursive references
81 when creating generic models without needing to create a concrete class.
82 """
83 origin = get_origin(type_) or type_ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
85 args = get_args(type_) if is_generic_alias(type_) else (args_override or ()) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
86 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
87 if generic_metadata: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
88 origin = generic_metadata['origin'] or origin 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
89 args = generic_metadata['args'] or args 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
91 module_name = getattr(origin, '__module__', '<No __module__>') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
92 if isinstance(origin, TypeAliasType): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
93 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}' 1tuvwabcdefFsxyzAghijklBCDEmnopqr
94 else:
95 try: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
96 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
97 except Exception:
98 qualname = getattr(origin, '__qualname__', '<No __qualname__>')
99 type_ref = f'{module_name}.{qualname}:{id(origin)}' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
101 arg_refs: list[str] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
102 for arg in args: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
103 if isinstance(arg, str): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
104 # Handle string literals as a special case; we may be able to remove this special handling if we
105 # wrap them in a ForwardRef at some point.
106 arg_ref = f'{arg}:str-{id(arg)}' 1tuvwabcdefFsxyzAghijklBCDEmnopqr
107 else:
108 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
109 arg_refs.append(arg_ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
110 if arg_refs: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
111 type_ref = f'{type_ref}[{",".join(arg_refs)}]' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
112 return type_ref 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
115def get_ref(s: core_schema.CoreSchema) -> None | str: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
116 """Get the ref from the schema if it has one.
117 This exists just for type checking to work correctly.
118 """
119 return s.get('ref', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
122def collect_definitions(schema: core_schema.CoreSchema) -> dict[str, core_schema.CoreSchema]: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
123 defs: dict[str, CoreSchema] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
125 def _record_valid_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
126 ref = get_ref(s) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
127 if ref: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
128 defs[ref] = s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
129 return recurse(s, _record_valid_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
131 walk_core_schema(schema, _record_valid_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
133 return defs 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
136def define_expected_missing_refs( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
137 schema: core_schema.CoreSchema, allowed_missing_refs: set[str]
138) -> core_schema.CoreSchema | None:
139 if not allowed_missing_refs: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
140 # in this case, there are no missing refs to potentially substitute, so there's no need to walk the schema
141 # this is a common case (will be hit for all non-generic models), so it's worth optimizing for
142 return None 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
144 refs = collect_definitions(schema).keys() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
146 expected_missing_refs = allowed_missing_refs.difference(refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
147 if expected_missing_refs: 147 ↛ 155line 147 didn't jump to line 155, because the condition on line 147 was always true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
148 definitions: list[core_schema.CoreSchema] = [ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
149 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail
150 # Issue: https://github.com/pydantic/pydantic-core/issues/619
151 core_schema.none_schema(ref=ref, metadata={HAS_INVALID_SCHEMAS_METADATA_KEY: True})
152 for ref in expected_missing_refs
153 ]
154 return core_schema.definitions_schema(schema, definitions) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
155 return None
158def collect_invalid_schemas(schema: core_schema.CoreSchema) -> bool: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
159 invalid = False 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
161 def _is_schema_valid(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
162 nonlocal invalid
163 if 'metadata' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
164 metadata = s['metadata'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
165 if HAS_INVALID_SCHEMAS_METADATA_KEY in metadata: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
166 invalid = metadata[HAS_INVALID_SCHEMAS_METADATA_KEY] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
167 return s 1tuvwabcdefFsxyzAghijklBCDEmnopqr
168 return recurse(s, _is_schema_valid) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
170 walk_core_schema(schema, _is_schema_valid) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
171 return invalid 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
174T = TypeVar('T') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
177Recurse = Callable[[core_schema.CoreSchema, 'Walk'], core_schema.CoreSchema] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
178Walk = Callable[[core_schema.CoreSchema, Recurse], core_schema.CoreSchema] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
180# TODO: Should we move _WalkCoreSchema into pydantic_core proper?
181# Issue: https://github.com/pydantic/pydantic-core/issues/615
184class _WalkCoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
185 def __init__(self): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
186 self._schema_type_to_method = self._build_schema_type_to_method() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
188 def _build_schema_type_to_method(self) -> dict[core_schema.CoreSchemaType, Recurse]: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
189 mapping: dict[core_schema.CoreSchemaType, Recurse] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
190 key: core_schema.CoreSchemaType
191 for key in get_args(core_schema.CoreSchemaType): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
192 method_name = f"handle_{key.replace('-', '_')}_schema" 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
193 mapping[key] = getattr(self, method_name, self._handle_other_schemas) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
194 return mapping 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
196 def walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
197 return f(schema, self._walk) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
199 def _walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
200 schema = self._schema_type_to_method[schema['type']](schema.copy(), f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
201 ser_schema: core_schema.SerSchema | None = schema.get('serialization') # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
202 if ser_schema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
203 schema['serialization'] = self._handle_ser_schemas(ser_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
204 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
206 def _handle_other_schemas(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
207 sub_schema = schema.get('schema', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
208 if sub_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
209 schema['schema'] = self.walk(sub_schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
210 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
212 def _handle_ser_schemas(self, ser_schema: core_schema.SerSchema, f: Walk) -> core_schema.SerSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
213 schema: core_schema.CoreSchema | None = ser_schema.get('schema', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
214 if schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
215 ser_schema['schema'] = self.walk(schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklBCDEmnopqr
216 return_schema: core_schema.CoreSchema | None = ser_schema.get('return_schema', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
217 if return_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
218 ser_schema['return_schema'] = self.walk(return_schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklBCDEmnopqr
219 return ser_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
221 def handle_definitions_schema(self, schema: core_schema.DefinitionsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
222 new_definitions: list[core_schema.CoreSchema] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
223 for definition in schema['definitions']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
224 if 'schema_ref' in definition and 'ref' in definition: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
225 # This indicates a purposely indirect reference
226 # We want to keep such references around for implications related to JSON schema, etc.:
227 new_definitions.append(definition) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
228 # However, we still need to walk the referenced definition:
229 self.walk(definition, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
230 continue 1tuvwabcdefFsxyzAghijklBCDEmnopqr
232 updated_definition = self.walk(definition, f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
233 if 'ref' in updated_definition: 233 ↛ 223line 233 didn't jump to line 223, because the condition on line 233 was always true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
234 # If the updated definition schema doesn't have a 'ref', it shouldn't go in the definitions
235 # This is most likely to happen due to replacing something with a definition reference, in
236 # which case it should certainly not go in the definitions list
237 new_definitions.append(updated_definition) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
238 new_inner_schema = self.walk(schema['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
240 if not new_definitions and len(schema) == 3: 240 ↛ 242line 240 didn't jump to line 242, because the condition on line 240 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
241 # This means we'd be returning a "trivial" definitions schema that just wrapped the inner schema
242 return new_inner_schema
244 new_schema = schema.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
245 new_schema['schema'] = new_inner_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
246 new_schema['definitions'] = new_definitions 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
247 return new_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
249 def handle_list_schema(self, schema: core_schema.ListSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
250 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
251 if items_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
252 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
253 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
255 def handle_set_schema(self, schema: core_schema.SetSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
256 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
257 if items_schema is not None: 257 ↛ 259line 257 didn't jump to line 259, because the condition on line 257 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr
258 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
259 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
261 def handle_frozenset_schema(self, schema: core_schema.FrozenSetSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
262 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
263 if items_schema is not None: 263 ↛ 265line 263 didn't jump to line 265, because the condition on line 263 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr
264 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
265 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
267 def handle_generator_schema(self, schema: core_schema.GeneratorSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
268 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
269 if items_schema is not None: 269 ↛ 271line 269 didn't jump to line 271, because the condition on line 269 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr
270 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
271 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
273 def handle_tuple_schema(self, schema: core_schema.TupleSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
274 schema['items_schema'] = [self.walk(v, f) for v in schema['items_schema']] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
275 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
277 def handle_dict_schema(self, schema: core_schema.DictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
278 keys_schema = schema.get('keys_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
279 if keys_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
280 schema['keys_schema'] = self.walk(keys_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
281 values_schema = schema.get('values_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
282 if values_schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
283 schema['values_schema'] = self.walk(values_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
284 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
286 def handle_function_schema(self, schema: AnyFunctionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
287 if not is_function_with_inner_schema(schema): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
288 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
289 schema['schema'] = self.walk(schema['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
290 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
292 def handle_union_schema(self, schema: core_schema.UnionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
293 new_choices: list[CoreSchema | tuple[CoreSchema, str]] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
294 for v in schema['choices']: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
295 if isinstance(v, tuple): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
296 new_choices.append((self.walk(v[0], f), v[1])) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
297 else:
298 new_choices.append(self.walk(v, f)) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
299 schema['choices'] = new_choices 1tuvwabcdefFsxyzAghijklBCDEmnopqr
300 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
302 def handle_tagged_union_schema(self, schema: core_schema.TaggedUnionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
303 new_choices: dict[Hashable, core_schema.CoreSchema] = {} 1tuvwabcdefFsxyzAghijklBCDEmnopqr
304 for k, v in schema['choices'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
305 new_choices[k] = v if isinstance(v, (str, int)) else self.walk(v, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
306 schema['choices'] = new_choices 1tuvwabcdefFsxyzAghijklBCDEmnopqr
307 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
309 def handle_chain_schema(self, schema: core_schema.ChainSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
310 schema['steps'] = [self.walk(v, f) for v in schema['steps']] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
311 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
313 def handle_lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
314 schema['lax_schema'] = self.walk(schema['lax_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
315 schema['strict_schema'] = self.walk(schema['strict_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
316 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
318 def handle_json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
319 schema['json_schema'] = self.walk(schema['json_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
320 schema['python_schema'] = self.walk(schema['python_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
321 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
323 def handle_model_fields_schema(self, schema: core_schema.ModelFieldsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
324 extras_schema = schema.get('extras_schema') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
325 if extras_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
326 schema['extras_schema'] = self.walk(extras_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
327 replaced_fields: dict[str, core_schema.ModelField] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
328 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
329 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
330 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
331 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
332 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
333 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
334 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr
335 for k, v in schema['fields'].items(): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
336 replaced_field = v.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
337 replaced_field['schema'] = self.walk(v['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
338 replaced_fields[k] = replaced_field 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
339 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
340 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
342 def handle_typed_dict_schema(self, schema: core_schema.TypedDictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
343 extras_schema = schema.get('extras_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
344 if extras_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
345 schema['extras_schema'] = self.walk(extras_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
346 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
347 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
348 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
349 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
350 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
351 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
352 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr
353 replaced_fields: dict[str, core_schema.TypedDictField] = {} 1tuvwabcdefFsxyzAghijklBCDEmnopqr
354 for k, v in schema['fields'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
355 replaced_field = v.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
356 replaced_field['schema'] = self.walk(v['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
357 replaced_fields[k] = replaced_field 1tuvwabcdefFsxyzAghijklBCDEmnopqr
358 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr
359 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
361 def handle_dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
362 replaced_fields: list[core_schema.DataclassField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
363 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
364 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
365 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
366 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
367 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
368 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
369 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr
370 for field in schema['fields']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
371 replaced_field = field.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
372 replaced_field['schema'] = self.walk(field['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
373 replaced_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
374 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
375 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
377 def handle_arguments_schema(self, schema: core_schema.ArgumentsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
378 replaced_arguments_schema: list[core_schema.ArgumentsParameter] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
379 for param in schema['arguments_schema']: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
380 replaced_param = param.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
381 replaced_param['schema'] = self.walk(param['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
382 replaced_arguments_schema.append(replaced_param) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
383 schema['arguments_schema'] = replaced_arguments_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
384 if 'var_args_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
385 schema['var_args_schema'] = self.walk(schema['var_args_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
386 if 'var_kwargs_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
387 schema['var_kwargs_schema'] = self.walk(schema['var_kwargs_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
388 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
390 def handle_call_schema(self, schema: core_schema.CallSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
391 schema['arguments_schema'] = self.walk(schema['arguments_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
392 if 'return_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
393 schema['return_schema'] = self.walk(schema['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
394 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
397_dispatch = _WalkCoreSchema().walk 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
400def walk_core_schema(schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
401 """Recursively traverse a CoreSchema.
403 Args:
404 schema (core_schema.CoreSchema): The CoreSchema to process, it will not be modified.
405 f (Walk): A function to apply. This function takes two arguments:
406 1. The current CoreSchema that is being processed
407 (not the same one you passed into this function, one level down).
408 2. The "next" `f` to call. This lets you for example use `f=functools.partial(some_method, some_context)`
409 to pass data down the recursive calls without using globals or other mutable state.
411 Returns:
412 core_schema.CoreSchema: A processed CoreSchema.
413 """
414 return f(schema.copy(), _dispatch) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
417def simplify_schema_references(schema: core_schema.CoreSchema) -> core_schema.CoreSchema: # noqa: C901 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
418 definitions: dict[str, core_schema.CoreSchema] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
419 ref_counts: dict[str, int] = defaultdict(int) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
420 involved_in_recursion: dict[str, bool] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
421 current_recursion_ref_count: dict[str, int] = defaultdict(int) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
423 def collect_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
424 if s['type'] == 'definitions': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
425 for definition in s['definitions']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
426 ref = get_ref(definition) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
427 assert ref is not None 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
428 if ref not in definitions: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
429 definitions[ref] = definition 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
430 recurse(definition, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
431 return recurse(s['schema'], collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
432 else:
433 ref = get_ref(s) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
434 if ref is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
435 new = recurse(s, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
436 new_ref = get_ref(new) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
437 if new_ref: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
438 definitions[new_ref] = new 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
439 return core_schema.definition_reference_schema(schema_ref=ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
440 else:
441 return recurse(s, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
443 schema = walk_core_schema(schema, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
445 def count_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
446 if s['type'] != 'definition-ref': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
447 return recurse(s, count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
448 ref = s['schema_ref'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
449 ref_counts[ref] += 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
451 if ref_counts[ref] >= 2: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
452 # If this model is involved in a recursion this should be detected
453 # on its second encounter, we can safely stop the walk here.
454 if current_recursion_ref_count[ref] != 0: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
455 involved_in_recursion[ref] = True 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
456 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
458 current_recursion_ref_count[ref] += 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
459 recurse(definitions[ref], count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
460 current_recursion_ref_count[ref] -= 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
461 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
463 schema = walk_core_schema(schema, count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
465 assert all(c == 0 for c in current_recursion_ref_count.values()), 'this is a bug! please report it' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
467 def can_be_inlined(s: core_schema.DefinitionReferenceSchema, ref: str) -> bool: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
468 if ref_counts[ref] > 1: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
469 return False 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
470 if involved_in_recursion.get(ref, False): 470 ↛ 471line 470 didn't jump to line 471, because the condition on line 470 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
471 return False
472 if 'serialization' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
473 return False 1tuvwabcdefFsxyzAghijklBCDEmnopqr
474 if 'metadata' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
475 metadata = s['metadata'] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
476 for k in ( 1tuvwabcdefFsxyzAghijklBCDEmnopqr
477 'pydantic_js_functions',
478 'pydantic_js_annotation_functions',
479 'pydantic.internal.union_discriminator',
480 ):
481 if k in metadata: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
482 # we need to keep this as a ref
483 return False 1tuvwabcdefFsxyzAghijklBCDEmnopqr
484 return True 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
486 def inline_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
487 if s['type'] == 'definition-ref': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
488 ref = s['schema_ref'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
489 # Check if the reference is only used once, not involved in recursion and does not have
490 # any extra keys (like 'serialization')
491 if can_be_inlined(s, ref): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
492 # Inline the reference by replacing the reference with the actual schema
493 new = definitions.pop(ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
494 ref_counts[ref] -= 1 # because we just replaced it! 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
495 # put all other keys that were on the def-ref schema into the inlined version
496 # in particular this is needed for `serialization`
497 if 'serialization' in s: 497 ↛ 498line 497 didn't jump to line 498, because the condition on line 497 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
498 new['serialization'] = s['serialization']
499 s = recurse(new, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
500 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
501 else:
502 return recurse(s, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
503 else:
504 return recurse(s, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
506 schema = walk_core_schema(schema, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
508 def_values = [v for v in definitions.values() if ref_counts[v['ref']] > 0] # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
510 if def_values: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
511 schema = core_schema.definitions_schema(schema=schema, definitions=def_values) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
512 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
515def _strip_metadata(schema: CoreSchema) -> CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
516 def strip_metadata(s: CoreSchema, recurse: Recurse) -> CoreSchema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
517 s = s.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
518 s.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
519 if s['type'] == 'model-fields': 1tuvwabcdefFsxyzAghijklBCDEmnopqr
520 s = s.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr
521 s['fields'] = {k: v.copy() for k, v in s['fields'].items()} 1tuvwabcdefFsxyzAghijklBCDEmnopqr
522 for field_name, field_schema in s['fields'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr
523 field_schema.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
524 s['fields'][field_name] = field_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr
525 computed_fields = s.get('computed_fields', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
526 if computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
527 s['computed_fields'] = [cf.copy() for cf in computed_fields] 1tuvwabcdefFsxyzAghijklBCDEmnopqr
528 for cf in computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
529 cf.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
530 else:
531 s.pop('computed_fields', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
532 elif s['type'] == 'model': 1tuvwabcdefFsxyzAghijklBCDEmnopqr
533 # remove some defaults
534 if s.get('custom_init', True) is False: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
535 s.pop('custom_init') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
536 if s.get('root_model', True) is False: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
537 s.pop('root_model') 1tuvwabcdefFsxyzAghijklBCDEmnopqr
538 if {'title'}.issuperset(s.get('config', {}).keys()): 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true1tuvwabcdefFsxyzAghijklBCDEmnopqr
539 s.pop('config', None)
541 return recurse(s, strip_metadata) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
543 return walk_core_schema(schema, strip_metadata) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
546def pretty_print_core_schema( 1tuvwabcdefxyzAghijklNOGHIJKLMBCDEmnopqr
547 schema: CoreSchema,
548 include_metadata: bool = False,
549) -> None:
550 """Pretty print a CoreSchema using rich.
551 This is intended for debugging purposes.
553 Args:
554 schema: The CoreSchema to print.
555 include_metadata: Whether to include metadata in the output. Defaults to `False`.
556 """
557 from rich import print # type: ignore # install it manually in your dev env 1tuvwabcdefFsxyzAghijklBCDEmnopqr
559 if not include_metadata: 1tuvwabcdefFsxyzAghijklBCDEmnopqr
560 schema = _strip_metadata(schema) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
562 return print(schema) 1tuvwabcdefFsxyzAghijklBCDEmnopqr
565def validate_core_schema(schema: CoreSchema) -> CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
566 if 'PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS' in os.environ: 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr
567 return schema
568 return _validate_core_schema(schema) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr