Coverage for pydantic/_internal/_schema_gather.py: 95.42%

132 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-20 16:49 +0000

1# pyright: reportTypedDictNotRequiredAccess=false, reportGeneralTypeIssues=false, reportArgumentType=false, reportAttributeAccessIssue=false 

2from __future__ import annotations 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

3 

4from dataclasses import dataclass, field 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

5from typing import TypedDict 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

6 

7from pydantic_core.core_schema import ComputedField, CoreSchema, DefinitionReferenceSchema, SerSchema 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

8from typing_extensions import TypeAlias 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

9 

10AllSchemas: TypeAlias = 'CoreSchema | SerSchema | ComputedField' 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

11 

12 

13class GatherResult(TypedDict): 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

14 """Schema traversing result.""" 

15 

16 collected_references: dict[str, DefinitionReferenceSchema | None] 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

17 """The collected definition references. 1abcdefghijMklmnopqrstuNPvwxyzABCDEO

18 

19 If a definition reference schema can be inlined, it means that there is 

20 only one in the whole core schema. As such, it is stored as the value. 

21 Otherwise, the value is set to `None`. 

22 """ 

23 

24 deferred_discriminator_schemas: list[CoreSchema] 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

25 """The list of core schemas having the discriminator application deferred.""" 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

26 

27 

28class MissingDefinitionError(LookupError): 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

29 """A reference was pointing to a non-existing core schema.""" 

30 

31 def __init__(self, schema_reference: str, /) -> None: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

32 self.schema_reference = schema_reference 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

33 

34 

35@dataclass 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

36class GatherContext: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

37 """The current context used during core schema traversing. 

38 

39 Context instances should only be used during schema traversing. 

40 """ 

41 

42 definitions: dict[str, CoreSchema] 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

43 """The available definitions.""" 1abcdefghijMklmnopqrstuNPvwxyzABCDEO

44 

45 deferred_discriminator_schemas: list[CoreSchema] = field(init=False, default_factory=list) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

46 """The list of core schemas having the discriminator application deferred. 1abcdefghijMklmnopqrstuNPvwxyzABCDEO

47 

48 Internally, these core schemas have a specific key set in the core metadata dict. 

49 """ 

50 

51 collected_references: dict[str, DefinitionReferenceSchema | None] = field(init=False, default_factory=dict) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

52 """The collected definition references. 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

53 

54 If a definition reference schema can be inlined, it means that there is 

55 only one in the whole core schema. As such, it is stored as the value. 

56 Otherwise, the value is set to `None`. 

57 

58 During schema traversing, definition reference schemas can be added as candidates, or removed 

59 (by setting the value to `None`). 

60 """ 

61 

62 

63def traverse_metadata(schema: AllSchemas, ctx: GatherContext) -> None: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

64 meta = schema.get('metadata') 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

65 if meta is not None and 'pydantic_internal_union_discriminator' in meta: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

66 ctx.deferred_discriminator_schemas.append(schema) # pyright: ignore[reportArgumentType] 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

67 

68 

69def traverse_definition_ref(def_ref_schema: DefinitionReferenceSchema, ctx: GatherContext) -> None: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

70 schema_ref = def_ref_schema['schema_ref'] 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

71 

72 if schema_ref not in ctx.collected_references: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

73 definition = ctx.definitions.get(schema_ref) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

74 if definition is None: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

75 raise MissingDefinitionError(schema_ref) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

76 

77 # The `'definition-ref'` schema was only encountered once, make it 

78 # a candidate to be inlined: 

79 ctx.collected_references[schema_ref] = def_ref_schema 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

80 traverse_schema(definition, ctx) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

81 if 'serialization' in def_ref_schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

82 traverse_schema(def_ref_schema['serialization'], ctx) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

83 traverse_metadata(def_ref_schema, ctx) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

84 else: 

85 # The `'definition-ref'` schema was already encountered, meaning 

86 # the previously encountered schema (and this one) can't be inlined: 

87 ctx.collected_references[schema_ref] = None 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

88 

89 

90def traverse_schema(schema: AllSchemas, context: GatherContext) -> None: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

91 # TODO When we drop 3.9, use a match statement to get better type checking and remove 

92 # file-level type ignore. 

93 # (the `'type'` could also be fetched in every `if/elif` statement, but this alters performance). 

94 schema_type = schema['type'] 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

95 

96 if schema_type == 'definition-ref': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

97 traverse_definition_ref(schema, context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

98 # `traverse_definition_ref` handles the possible serialization and metadata schemas: 

99 return 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

100 elif schema_type == 'definitions': 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

101 traverse_schema(schema['schema'], context) 

102 for definition in schema['definitions']: 

103 traverse_schema(definition, context) 

104 elif schema_type in {'list', 'set', 'frozenset', 'generator'}: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

105 if 'items_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

106 traverse_schema(schema['items_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

107 elif schema_type == 'tuple': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

108 if 'items_schema' in schema: 108 ↛ 187line 108 didn't jump to line 187 because the condition on line 108 was always true1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

109 for s in schema['items_schema']: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

110 traverse_schema(s, context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

111 elif schema_type == 'dict': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

112 if 'keys_schema' in schema: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

113 traverse_schema(schema['keys_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

114 if 'values_schema' in schema: 114 ↛ 187line 114 didn't jump to line 187 because the condition on line 114 was always true1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

115 traverse_schema(schema['values_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

116 elif schema_type == 'union': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

117 for choice in schema['choices']: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

118 if isinstance(choice, tuple): 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

119 traverse_schema(choice[0], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

120 else: 

121 traverse_schema(choice, context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

122 elif schema_type == 'tagged-union': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

123 for v in schema['choices'].values(): 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

124 traverse_schema(v, context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

125 elif schema_type == 'chain': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

126 for step in schema['steps']: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

127 traverse_schema(step, context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

128 elif schema_type == 'lax-or-strict': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

129 traverse_schema(schema['lax_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

130 traverse_schema(schema['strict_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

131 elif schema_type == 'json-or-python': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

132 traverse_schema(schema['json_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

133 traverse_schema(schema['python_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

134 elif schema_type in {'model-fields', 'typed-dict'}: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

135 if 'extras_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

136 traverse_schema(schema['extras_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

137 if 'computed_fields' in schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

138 for s in schema['computed_fields']: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

139 traverse_schema(s, context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

140 for s in schema['fields'].values(): 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

141 traverse_schema(s, context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

142 elif schema_type == 'dataclass-args': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

143 if 'computed_fields' in schema: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

144 for s in schema['computed_fields']: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

145 traverse_schema(s, context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

146 for s in schema['fields']: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

147 traverse_schema(s, context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

148 elif schema_type == 'arguments': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

149 for s in schema['arguments_schema']: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

150 traverse_schema(s['schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

151 if 'var_args_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

152 traverse_schema(schema['var_args_schema'], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

153 if 'var_kwargs_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

154 traverse_schema(schema['var_kwargs_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

155 elif schema_type == 'arguments-v3': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

156 for s in schema['arguments_schema']: 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

157 traverse_schema(s['schema'], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

158 elif schema_type == 'call': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

159 traverse_schema(schema['arguments_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

160 if 'return_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

161 traverse_schema(schema['return_schema'], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

162 elif schema_type == 'computed-field': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

163 traverse_schema(schema['return_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

164 elif schema_type == 'function-before': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

165 if 'schema' in schema: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was always true1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

166 traverse_schema(schema['schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

167 if 'json_schema_input_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

168 traverse_schema(schema['json_schema_input_schema'], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

169 elif schema_type == 'function-plain': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

170 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

171 if 'return_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

172 traverse_schema(schema['return_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

173 if 'json_schema_input_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

174 traverse_schema(schema['json_schema_input_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

175 elif schema_type == 'function-wrap': 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

176 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

177 if 'return_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

178 traverse_schema(schema['return_schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

179 if 'schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

180 traverse_schema(schema['schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

181 if 'json_schema_input_schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

182 traverse_schema(schema['json_schema_input_schema'], context) 1FGabcdefghijHkIJlmnopqrstuKLvwxyzABCDE

183 else: 

184 if 'schema' in schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

185 traverse_schema(schema['schema'], context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

186 

187 if 'serialization' in schema: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

188 traverse_schema(schema['serialization'], context) 1FGabcdefghijMHkIJlmnopqrstuNKLvwxyzABCDEO

189 traverse_metadata(schema, context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

190 

191 

192def gather_schemas_for_cleaning(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> GatherResult: 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

193 """Traverse the core schema and definitions and return the necessary information for schema cleaning. 

194 

195 During the core schema traversing, any `'definition-ref'` schema is: 

196 

197 - Validated: the reference must point to an existing definition. If this is not the case, a 

198 `MissingDefinitionError` exception is raised. 

199 - Stored in the context: the actual reference is stored in the context. Depending on whether 

200 the `'definition-ref'` schema is encountered more that once, the schema itself is also 

201 saved in the context to be inlined (i.e. replaced by the definition it points to). 

202 """ 

203 context = GatherContext(definitions) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

204 traverse_schema(schema, context) 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

205 

206 return { 1FGabcdefghijMHkIJlmnopqrstuNPKLvwxyzABCDEO

207 'collected_references': context.collected_references, 

208 'deferred_discriminator_schemas': context.deferred_discriminator_schemas, 

209 }