Coverage for pydantic/_internal/_schema_gather.py: 95.42%

132 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-04 10:05 +0000

1# pyright: reportTypedDictNotRequiredAccess=false, reportGeneralTypeIssues=false, reportArgumentType=false, reportAttributeAccessIssue=false 

2from __future__ import annotations 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

3 

4from dataclasses import dataclass, field 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

5from typing import TypedDict 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

6 

7from pydantic_core.core_schema import ComputedField, CoreSchema, DefinitionReferenceSchema, SerSchema 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

8from typing_extensions import TypeAlias 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

9 

10AllSchemas: TypeAlias = 'CoreSchema | SerSchema | ComputedField' 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

11 

12 

13class GatherResult(TypedDict): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

14 """Schema traversing result.""" 

15 

16 collected_references: dict[str, DefinitionReferenceSchema | None] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

17 """The collected definition references. 1abcdefghijklmnopqrsJtuvwxyzAB

18 

19 If a definition reference schema can be inlined, it means that there is 

20 only one in the whole core schema. As such, it is stored as the value. 

21 Otherwise, the value is set to `None`. 

22 """ 

23 

24 deferred_discriminator_schemas: list[CoreSchema] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

25 """The list of core schemas having the discriminator application deferred.""" 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

26 

27 

28class MissingDefinitionError(LookupError): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

29 """A reference was pointing to a non-existing core schema.""" 

30 

31 def __init__(self, schema_reference: str, /) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

32 self.schema_reference = schema_reference 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

33 

34 

35@dataclass 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

36class GatherContext: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

37 """The current context used during core schema traversing. 

38 

39 Context instances should only be used during schema traversing. 

40 """ 

41 

42 definitions: dict[str, CoreSchema] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

43 """The available definitions.""" 1abcdefghijklmnopqrsJtuvwxyzAB

44 

45 deferred_discriminator_schemas: list[CoreSchema] = field(init=False, default_factory=list) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

46 """The list of core schemas having the discriminator application deferred. 1abcdefghijklmnopqrsJtuvwxyzAB

47 

48 Internally, these core schemas have a specific key set in the core metadata dict. 

49 """ 

50 

51 collected_references: dict[str, DefinitionReferenceSchema | None] = field(init=False, default_factory=dict) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

52 """The collected definition references. 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

53 

54 If a definition reference schema can be inlined, it means that there is 

55 only one in the whole core schema. As such, it is stored as the value. 

56 Otherwise, the value is set to `None`. 

57 

58 During schema traversing, definition reference schemas can be added as candidates, or removed 

59 (by setting the value to `None`). 

60 """ 

61 

62 

63def traverse_metadata(schema: AllSchemas, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

64 meta = schema.get('metadata') 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

65 if meta is not None and 'pydantic_internal_union_discriminator' in meta: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

66 ctx.deferred_discriminator_schemas.append(schema) # pyright: ignore[reportArgumentType] 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

67 

68 

69def traverse_definition_ref(def_ref_schema: DefinitionReferenceSchema, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

70 schema_ref = def_ref_schema['schema_ref'] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

71 

72 if schema_ref not in ctx.collected_references: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

73 definition = ctx.definitions.get(schema_ref) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

74 if definition is None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

75 raise MissingDefinitionError(schema_ref) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

76 

77 # The `'definition-ref'` schema was only encountered once, make it 

78 # a candidate to be inlined: 

79 ctx.collected_references[schema_ref] = def_ref_schema 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

80 traverse_schema(definition, ctx) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

81 if 'serialization' in def_ref_schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

82 traverse_schema(def_ref_schema['serialization'], ctx) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

83 traverse_metadata(def_ref_schema, ctx) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

84 else: 

85 # The `'definition-ref'` schema was already encountered, meaning 

86 # the previously encountered schema (and this one) can't be inlined: 

87 ctx.collected_references[schema_ref] = None 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

88 

89 

90def traverse_schema(schema: AllSchemas, context: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

91 # TODO When we drop 3.9, use a match statement to get better type checking and remove 

92 # file-level type ignore. 

93 # (the `'type'` could also be fetched in every `if/elif` statement, but this alters performance). 

94 schema_type = schema['type'] 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

95 

96 if schema_type == 'definition-ref': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

97 traverse_definition_ref(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

98 # `traverse_definition_ref` handles the possible serialization and metadata schemas: 

99 return 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

100 elif schema_type == 'definitions': 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

101 traverse_schema(schema['schema'], context) 

102 for definition in schema['definitions']: 

103 traverse_schema(definition, context) 

104 elif schema_type in {'list', 'set', 'frozenset', 'generator'}: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

105 if 'items_schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

106 traverse_schema(schema['items_schema'], context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

107 elif schema_type == 'tuple': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

108 if 'items_schema' in schema: 108 ↛ 187line 108 didn't jump to line 187 because the condition on line 108 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

109 for s in schema['items_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

110 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

111 elif schema_type == 'dict': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

112 if 'keys_schema' in schema: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

113 traverse_schema(schema['keys_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

114 if 'values_schema' in schema: 114 ↛ 187line 114 didn't jump to line 187 because the condition on line 114 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

115 traverse_schema(schema['values_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

116 elif schema_type == 'union': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

117 for choice in schema['choices']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

118 if isinstance(choice, tuple): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

119 traverse_schema(choice[0], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

120 else: 

121 traverse_schema(choice, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

122 elif schema_type == 'tagged-union': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

123 for v in schema['choices'].values(): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

124 traverse_schema(v, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

125 elif schema_type == 'chain': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

126 for step in schema['steps']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

127 traverse_schema(step, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

128 elif schema_type == 'lax-or-strict': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

129 traverse_schema(schema['lax_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

130 traverse_schema(schema['strict_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

131 elif schema_type == 'json-or-python': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

132 traverse_schema(schema['json_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

133 traverse_schema(schema['python_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

134 elif schema_type in {'model-fields', 'typed-dict'}: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

135 if 'extras_schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

136 traverse_schema(schema['extras_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

137 if 'computed_fields' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

138 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

139 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

140 for s in schema['fields'].values(): 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

141 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

142 elif schema_type == 'dataclass-args': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

143 if 'computed_fields' in schema: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

144 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

145 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

146 for s in schema['fields']: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

147 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

148 elif schema_type == 'arguments': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

149 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

150 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

151 if 'var_args_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

152 traverse_schema(schema['var_args_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

153 if 'var_kwargs_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

154 traverse_schema(schema['var_kwargs_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

155 elif schema_type == 'arguments-v3': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

156 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

157 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

158 elif schema_type == 'call': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

159 traverse_schema(schema['arguments_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

160 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

161 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

162 elif schema_type == 'computed-field': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

163 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

164 elif schema_type == 'function-before': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

165 if 'schema' in schema: 165 ↛ 167line 165 didn't jump to line 167 because the condition on line 165 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

166 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

167 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

168 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

169 elif schema_type == 'function-plain': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

170 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

171 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

172 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

173 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

174 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

175 elif schema_type == 'function-wrap': 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

176 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

177 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

178 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

179 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

180 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

181 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

182 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

183 else: 

184 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

185 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

186 

187 if 'serialization' in schema: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

188 traverse_schema(schema['serialization'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

189 traverse_metadata(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

190 

191 

192def gather_schemas_for_cleaning(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> GatherResult: 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

193 """Traverse the core schema and definitions and return the necessary information for schema cleaning. 

194 

195 During the core schema traversing, any `'definition-ref'` schema is: 

196 

197 - Validated: the reference must point to an existing definition. If this is not the case, a 

198 `MissingDefinitionError` exception is raised. 

199 - Stored in the context: the actual reference is stored in the context. Depending on whether 

200 the `'definition-ref'` schema is encountered more that once, the schema itself is also 

201 saved in the context to be inlined (i.e. replaced by the definition it points to). 

202 """ 

203 context = GatherContext(definitions) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

204 traverse_schema(schema, context) 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

205 

206 return { 1CDabcdefghiEjFGklmnopqrsJHItuvwxyzAB

207 'collected_references': context.collected_references, 

208 'deferred_discriminator_schemas': context.deferred_discriminator_schemas, 

209 }