Coverage for pydantic/_internal/_schema_gather.py: 95.63%

127 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-18 09:13 +0000

1# pyright: reportTypedDictNotRequiredAccess=false, reportGeneralTypeIssues=false, reportArgumentType=false, reportAttributeAccessIssue=false 

2from __future__ import annotations 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

3 

4from dataclasses import dataclass, field 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

5from typing import TypedDict 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

6 

7from pydantic_core.core_schema import ComputedField, CoreSchema, DefinitionReferenceSchema, SerSchema 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

8from typing_extensions import TypeAlias 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

9 

10AllSchemas: TypeAlias = 'CoreSchema | SerSchema | ComputedField' 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

11 

12 

13class GatherResult(TypedDict): 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

14 """Schema traversing result.""" 

15 

16 collected_references: dict[str, DefinitionReferenceSchema | None] 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

17 """The collected definition references. 1abcdefghijklmnopqrsJKLMNOtuvwxyzAB

18 

19 If a definition reference schema can be inlined, it means that there is 

20 only one in the whole core schema. As such, it is stored as the value. 

21 Otherwise, the value is set to `None`. 

22 """ 

23 

24 deferred_discriminator_schemas: list[CoreSchema] 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

25 """The list of core schemas having the discriminator application deferred.""" 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

26 

27 

28class MissingDefinitionError(LookupError): 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

29 """A reference was pointing to a non-existing core schema.""" 

30 

31 def __init__(self, schema_reference: str, /) -> None: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

32 self.schema_reference = schema_reference 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

33 

34 

35@dataclass 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

36class GatherContext: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

37 """The current context used during core schema traversing. 

38 

39 Context instances should only be used during schema traversing. 

40 """ 

41 

42 definitions: dict[str, CoreSchema] 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

43 """The available definitions.""" 1abcdefghijklmnopqrsJKLMNOtuvwxyzAB

44 

45 deferred_discriminator_schemas: list[CoreSchema] = field(init=False, default_factory=list) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

46 """The list of core schemas having the discriminator application deferred. 1abcdefghijklmnopqrsJKLMNOtuvwxyzAB

47 

48 Internally, these core schemas have a specific key set in the core metadata dict. 

49 """ 

50 

51 collected_references: dict[str, DefinitionReferenceSchema | None] = field(init=False, default_factory=dict) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

52 """The collected definition references. 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

53 

54 If a definition reference schema can be inlined, it means that there is 

55 only one in the whole core schema. As such, it is stored as the value. 

56 Otherwise, the value is set to `None`. 

57 

58 During schema traversing, definition reference schemas can be added as candidates, or removed 

59 (by setting the value to `None`). 

60 """ 

61 

62 

63def traverse_metadata(schema: AllSchemas, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

64 meta = schema.get('metadata') 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

65 if meta is not None and 'pydantic_internal_union_discriminator' in meta: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

66 ctx.deferred_discriminator_schemas.append(schema) # pyright: ignore[reportArgumentType] 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

67 

68 

69def traverse_definition_ref(def_ref_schema: DefinitionReferenceSchema, ctx: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

70 schema_ref = def_ref_schema['schema_ref'] 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

71 

72 if schema_ref not in ctx.collected_references: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

73 definition = ctx.definitions.get(schema_ref) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

74 if definition is None: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

75 raise MissingDefinitionError(schema_ref) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

76 

77 # The `'definition-ref'` schema was only encountered once, make it 

78 # a candidate to be inlined: 

79 ctx.collected_references[schema_ref] = def_ref_schema 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

80 traverse_schema(definition, ctx) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

81 if 'serialization' in def_ref_schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

82 traverse_schema(def_ref_schema['serialization'], ctx) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

83 traverse_metadata(def_ref_schema, ctx) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

84 else: 

85 # The `'definition-ref'` schema was already encountered, meaning 

86 # the previously encountered schema (and this one) can't be inlined: 

87 ctx.collected_references[schema_ref] = None 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

88 

89 

90def traverse_schema(schema: AllSchemas, context: GatherContext) -> None: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

91 # TODO When we drop 3.9, use a match statement to get better type checking and remove 

92 # file-level type ignore. 

93 # (the `'type'` could also be fetched in every `if/elif` statement, but this alters performance). 

94 schema_type = schema['type'] 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

95 

96 if schema_type == 'definition-ref': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

97 traverse_definition_ref(schema, context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

98 # `traverse_definition_ref` handles the possible serialization and metadata schemas: 

99 return 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

100 elif schema_type == 'definitions': 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

101 traverse_schema(schema['schema'], context) 

102 for definition in schema['definitions']: 

103 traverse_schema(definition, context) 

104 elif schema_type in {'list', 'set', 'frozenset', 'generator'}: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

105 if 'items_schema' in schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

106 traverse_schema(schema['items_schema'], context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

107 elif schema_type == 'tuple': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

108 if 'items_schema' in schema: 108 ↛ 182line 108 didn't jump to line 182 because the condition on line 108 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

109 for s in schema['items_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

110 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

111 elif schema_type == 'dict': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

112 if 'keys_schema' in schema: 112 ↛ 114line 112 didn't jump to line 114 because the condition on line 112 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

113 traverse_schema(schema['keys_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

114 if 'values_schema' in schema: 114 ↛ 182line 114 didn't jump to line 182 because the condition on line 114 was always true1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

115 traverse_schema(schema['values_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

116 elif schema_type == 'union': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

117 for choice in schema['choices']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

118 if isinstance(choice, tuple): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

119 traverse_schema(choice[0], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

120 else: 

121 traverse_schema(choice, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

122 elif schema_type == 'tagged-union': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

123 for v in schema['choices'].values(): 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

124 traverse_schema(v, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

125 elif schema_type == 'chain': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

126 for step in schema['steps']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

127 traverse_schema(step, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

128 elif schema_type == 'lax-or-strict': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

129 traverse_schema(schema['lax_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

130 traverse_schema(schema['strict_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

131 elif schema_type == 'json-or-python': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

132 traverse_schema(schema['json_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

133 traverse_schema(schema['python_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

134 elif schema_type in {'model-fields', 'typed-dict'}: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

135 if 'extras_schema' in schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

136 traverse_schema(schema['extras_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

137 if 'computed_fields' in schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

138 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

139 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

140 for s in schema['fields'].values(): 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

141 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

142 elif schema_type == 'dataclass-args': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

143 if 'computed_fields' in schema: 143 ↛ 146line 143 didn't jump to line 146 because the condition on line 143 was always true1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

144 for s in schema['computed_fields']: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

145 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

146 for s in schema['fields']: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

147 traverse_schema(s, context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

148 elif schema_type == 'arguments': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

149 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

150 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

151 if 'var_args_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

152 traverse_schema(schema['var_args_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

153 if 'var_kwargs_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

154 traverse_schema(schema['var_kwargs_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

155 elif schema_type == 'arguments-v3': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

156 for s in schema['arguments_schema']: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

157 traverse_schema(s['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

158 elif schema_type == 'call': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

159 traverse_schema(schema['arguments_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

160 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

161 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

162 elif schema_type == 'computed-field': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

163 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

164 elif schema_type == 'function-plain': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

165 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

166 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

167 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

168 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

169 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

170 elif schema_type == 'function-wrap': 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

171 # TODO duplicate schema types for serializers and validators, needs to be deduplicated. 

172 if 'return_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

173 traverse_schema(schema['return_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

174 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

175 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

176 if 'json_schema_input_schema' in schema: 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

177 traverse_schema(schema['json_schema_input_schema'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

178 else: 

179 if 'schema' in schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

180 traverse_schema(schema['schema'], context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

181 

182 if 'serialization' in schema: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

183 traverse_schema(schema['serialization'], context) 1CDabcdefghiEjFGklmnopqrsHItuvwxyzAB

184 traverse_metadata(schema, context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

185 

186 

187def gather_schemas_for_cleaning(schema: CoreSchema, definitions: dict[str, CoreSchema]) -> GatherResult: 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

188 """Traverse the core schema and definitions and return the necessary information for schema cleaning. 

189 

190 During the core schema traversing, any `'definition-ref'` schema is: 

191 

192 - Validated: the reference must point to an existing definition. If this is not the case, a 

193 `MissingDefinitionError` exception is raised. 

194 - Stored in the context: the actual reference is stored in the context. Depending on whether 

195 the `'definition-ref'` schema is encountered more that once, the schema itself is also 

196 saved in the context to be inlined (i.e. replaced by the definition it points to). 

197 """ 

198 context = GatherContext(definitions) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

199 traverse_schema(schema, context) 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

200 

201 return { 1CDabcdefghiEjFGklmnopqrsPJKLMNOHItuvwxyzAB

202 'collected_references': context.collected_references, 

203 'deferred_discriminator_schemas': context.deferred_discriminator_schemas, 

204 }