Coverage for pydantic/_internal/_schema_gather.py: 95.42%
132 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-20 10:39 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-20 10:39 +0000
1# pyright: reportTypedDictNotRequiredAccess=false, reportGeneralTypeIssues=false, reportArgumentType=false, reportAttributeAccessIssue=false
2from __future__ import annotations 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
4from dataclasses import dataclass, field 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
5from typing import TypedDict 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
7from pydantic_core.core_schema import ComputedField, CoreSchema, DefinitionReferenceSchema, SerSchema 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
8from typing_extensions import TypeAlias 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
10AllSchemas: TypeAlias = 'CoreSchema | SerSchema | ComputedField' 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
13class GatherResult(TypedDict): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
14 """Schema traversing result."""
16 collected_references: dict[str, DefinitionReferenceSchema | None] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
17 """The collected definition references. 1abcdefghijklmnopqrsJtuvwxyzAB
19 If a definition reference schema can be inlined, it means that there is
20 only one in the whole core schema. As such, it is stored as the value.
21 Otherwise, the value is set to `None`.
22 """
24 deferred_discriminator_schemas: list[CoreSchema] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
25 """The list of core schemas having the discriminator application deferred.""" 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
28class MissingDefinitionError(LookupError): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
29 """A reference was pointing to a non-existing core schema."""
31 def __init__(self, schema_reference: str, /) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
32 self.schema_reference = schema_reference 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
35@dataclass 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
36class GatherContext: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
37 """The current context used during core schema traversing.
39 Context instances should only be used during schema traversing.
40 """
42 definitions: dict[str, CoreSchema] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
43 """The available definitions.""" 1abcdefghijklmnopqrsJtuvwxyzAB
45 deferred_discriminator_schemas: list[CoreSchema] = field(init=False, default_factory=list) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
46 """The list of core schemas having the discriminator application deferred. 1abcdefghijklmnopqrsJtuvwxyzAB
48 Internally, these core schemas have a specific key set in the core metadata dict.
49 """
51 collected_references: dict[str, DefinitionReferenceSchema | None] = field(init=False, default_factory=dict) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
52 """The collected definition references. 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
54 If a definition reference schema can be inlined, it means that there is
55 only one in the whole core schema. As such, it is stored as the value.
56 Otherwise, the value is set to `None`.
58 During schema traversing, definition reference schemas can be added as candidates, or removed
59 (by setting the value to `None`).
60 """
63def traverse_metadata(schema: AllSchemas, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
64 meta = schema.get('metadata') 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
65 if meta is not None and 'pydantic_internal_union_discriminator' in meta: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
66 ctx.deferred_discriminator_schemas.append(schema) # pyright: ignore[reportArgumentType] 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
69def traverse_definition_ref(def_ref_schema: DefinitionReferenceSchema, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
70 schema_ref = def_ref_schema['schema_ref'] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
72 if schema_ref not in ctx.collected_references: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
73 definition = ctx.definitions.get(schema_ref) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
74 if definition is None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
75 raise MissingDefinitionError(schema_ref) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
77 # The `'definition-ref'` schema was only encountered once, make it
78 # a candidate to be inlined:
79 ctx.collected_references[schema_ref] = def_ref_schema 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
80 traverse_schema(definition, ctx) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
81 if 'serialization' in def_ref_schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
82 traverse_schema(def_ref_schema['serialization'], ctx) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
83 traverse_metadata(def_ref_schema, ctx) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
84 else:
85 # The `'definition-ref'` schema was already encountered, meaning
86 # the previously encountered schema (and this one) can't be inlined:
87 ctx.collected_references[schema_ref] = None 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
90def traverse_schema(schema: AllSchemas, context: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
91 # TODO When we drop 3.9, use a match statement to get better type checking and remove
92 # file-level type ignore.
93 # (the `'type'` could also be fetched in every `if/elif` statement, but this alters performance).
94 schema_type = schema['type'] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
96 if schema_type == 'definition-ref': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
97 traverse_definition_ref(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
98 # `traverse_definition_ref` handles the possible serialization and metadata schemas:
99 return 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
100 elif schema_type == 'definitions': 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
101 traverse_schema(schema['schema'], context)
102 for definition in schema['definitions']:
103 traverse_schema(definition, context)
104 elif schema_type in {'list', 'set', 'frozenset', 'generator'}: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
105 if 'items_schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
106 traverse_schema(schema['items_schema'], context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
107 elif schema_type == 'tuple': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
108 if 'items_schema' in schema: 108 ↛ 187line 108 didn't jump to line 187 because the condition on line 108 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
109 for s in schema['items_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
110 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
111 elif schema_type == 'dict': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
112 if 'keys_schema' in schema: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
113 traverse_schema(schema['keys_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
114 if 'values_schema' in schema: 114 ↛ 187line 114 didn't jump to line 187 because the condition on line 114 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
115 traverse_schema(schema['values_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
116 elif schema_type == 'union': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
117 for choice in schema['choices']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
118 if isinstance(choice, tuple): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
119 traverse_schema(choice[0], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
120 else:
121 traverse_schema(choice, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
122 elif schema_type == 'tagged-union': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
123 for v in schema['choices'].values(): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
124 traverse_schema(v, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
125 elif schema_type == 'chain': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
126 for step in schema['steps']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
127 traverse_schema(step, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
128 elif schema_type == 'lax-or-strict': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
129 traverse_schema(schema['lax_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
130 traverse_schema(schema['strict_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
131 elif schema_type == 'json-or-python': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
132 traverse_schema(schema['json_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
133 traverse_schema(schema['python_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
134 elif schema_type in {'model-fields', 'typed-dict'}: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
135 if 'extras_schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
136 traverse_schema(schema['extras_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
137 if 'computed_fields' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
138 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
139 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
140 for s in schema['fields'].values(): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
141 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
142 elif schema_type == 'dataclass-args': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
143 if 'computed_fields' in schema: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
144 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
145 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
146 for s in schema['fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
147 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
148 elif schema_type == 'arguments': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
149 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
150 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
151 if 'var_args_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
152 traverse_schema(schema['var_args_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
153 if 'var_kwargs_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
154 traverse_schema(schema['var_kwargs_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
155 elif schema_type == 'arguments-v3': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
156 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
157 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
158 elif schema_type == 'call': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
159 traverse_schema(schema['arguments_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
160 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
161 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
162 elif schema_type == 'computed-field': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
163 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
164 elif schema_type == 'function-before': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
165 if 'schema' in schema: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
166 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
167 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
168 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
169 elif schema_type == 'function-plain': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
170 # TODO duplicate schema types for serializers and validators, needs to be deduplicated.
171 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
172 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
173 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
174 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
175 elif schema_type == 'function-wrap': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
176 # TODO duplicate schema types for serializers and validators, needs to be deduplicated.
177 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
178 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
179 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
180 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
181 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
182 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
183 else:
184 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
185 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
187 if 'serialization' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
188 traverse_schema(schema['serialization'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB
189 traverse_metadata(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
192def gather_schemas_for_cleaning(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> GatherResult: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
193 """Traverse the core schema and definitions and return the necessary information for schema cleaning.
195 During the core schema traversing, any `'definition-ref'` schema is:
197 - Validated: the reference must point to an existing definition. If this is not the case, a
198 `MissingDefinitionError` exception is raised.
199 - Stored in the context: the actual reference is stored in the context. Depending on whether
200 the `'definition-ref'` schema is encountered more that once, the schema itself is also
201 saved in the context to be inlined (i.e. replaced by the definition it points to).
202 """
203 context = GatherContext(definitions) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
204 traverse_schema(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
206 return { 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB
207 'collected_references': context.collected_references,
208 'deferred_discriminator_schemas': context.deferred_discriminator_schemas,
209 }