Coverage for pydantic/_internal/_core_utils.py: 96.49%

367 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-03 19:29 +0000

1from __future__ import annotations 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

2 

3import os 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

4from collections import defaultdict 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

5from typing import ( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

6 Any, 

7 Callable, 

8 Hashable, 

9 TypeVar, 

10 Union, 

11) 

12 

13from pydantic_core import CoreSchema, core_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

14from pydantic_core import validate_core_schema as _validate_core_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

15from typing_extensions import TypeAliasType, TypeGuard, get_args, get_origin 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

16 

17from . import _repr 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

18from ._typing_extra import is_generic_alias 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

19 

20AnyFunctionSchema = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

21 core_schema.AfterValidatorFunctionSchema, 

22 core_schema.BeforeValidatorFunctionSchema, 

23 core_schema.WrapValidatorFunctionSchema, 

24 core_schema.PlainValidatorFunctionSchema, 

25] 

26 

27 

28FunctionSchemaWithInnerSchema = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

29 core_schema.AfterValidatorFunctionSchema, 

30 core_schema.BeforeValidatorFunctionSchema, 

31 core_schema.WrapValidatorFunctionSchema, 

32] 

33 

34CoreSchemaField = Union[ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

35 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField 

36] 

37CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

38 

39_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

40_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

41_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

42 

43TAGGED_UNION_TAG_KEY = 'pydantic.internal.tagged_union_tag' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

44""" 1abcdefghyijklmnopMNOPQRSTqrstuvwx

45Used in a `Tag` schema to specify the tag used for a discriminated union. 

46""" 

47HAS_INVALID_SCHEMAS_METADATA_KEY = 'pydantic.internal.invalid' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

48"""Used to mark a schema that is invalid because it refers to a definition that was not yet defined when the 1abcdefghyijklmnopMNOPQRSTqrstuvwx

49schema was first encountered. 

50""" 

51 

52 

53def is_core_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

54 schema: CoreSchemaOrField, 

55) -> TypeGuard[CoreSchema]: 

56 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

57 

58 

59def is_core_schema_field( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

60 schema: CoreSchemaOrField, 

61) -> TypeGuard[CoreSchemaField]: 

62 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

63 

64 

65def is_function_with_inner_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

66 schema: CoreSchemaOrField, 

67) -> TypeGuard[FunctionSchemaWithInnerSchema]: 

68 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

69 

70 

71def is_list_like_schema_with_items_schema( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

72 schema: CoreSchema, 

73) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]: 

74 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

75 

76 

77def get_type_ref(type_: type[Any], args_override: tuple[type[Any], ...] | None = None) -> str: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

78 """Produces the ref to be used for this type by pydantic_core's core schemas. 

79 

80 This `args_override` argument was added for the purpose of creating valid recursive references 

81 when creating generic models without needing to create a concrete class. 

82 """ 

83 origin = get_origin(type_) or type_ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

84 

85 args = get_args(type_) if is_generic_alias(type_) else (args_override or ()) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

86 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

87 if generic_metadata: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

88 origin = generic_metadata['origin'] or origin 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

89 args = generic_metadata['args'] or args 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

90 

91 module_name = getattr(origin, '__module__', '<No __module__>') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

92 if isinstance(origin, TypeAliasType): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

93 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}' 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

94 else: 

95 try: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

96 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

97 except Exception: 

98 qualname = getattr(origin, '__qualname__', '<No __qualname__>') 

99 type_ref = f'{module_name}.{qualname}:{id(origin)}' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

100 

101 arg_refs: list[str] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

102 for arg in args: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

103 if isinstance(arg, str): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

104 # Handle string literals as a special case; we may be able to remove this special handling if we 

105 # wrap them in a ForwardRef at some point. 

106 arg_ref = f'{arg}:str-{id(arg)}' 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

107 else: 

108 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

109 arg_refs.append(arg_ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

110 if arg_refs: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

111 type_ref = f'{type_ref}[{",".join(arg_refs)}]' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

112 return type_ref 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

113 

114 

115def get_ref(s: core_schema.CoreSchema) -> None | str: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

116 """Get the ref from the schema if it has one. 

117 This exists just for type checking to work correctly. 

118 """ 

119 return s.get('ref', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

120 

121 

122def collect_definitions(schema: core_schema.CoreSchema) -> dict[str, core_schema.CoreSchema]: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

123 defs: dict[str, CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

124 

125 def _record_valid_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

126 ref = get_ref(s) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

127 if ref: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

128 defs[ref] = s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

129 return recurse(s, _record_valid_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

130 

131 walk_core_schema(schema, _record_valid_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

132 

133 return defs 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

134 

135 

136def define_expected_missing_refs( 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

137 schema: core_schema.CoreSchema, allowed_missing_refs: set[str] 

138) -> core_schema.CoreSchema | None: 

139 if not allowed_missing_refs: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

140 # in this case, there are no missing refs to potentially substitute, so there's no need to walk the schema 

141 # this is a common case (will be hit for all non-generic models), so it's worth optimizing for 

142 return None 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

143 

144 refs = collect_definitions(schema).keys() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

145 

146 expected_missing_refs = allowed_missing_refs.difference(refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

147 if expected_missing_refs: 147 ↛ 155line 147 didn't jump to line 155 because the condition on line 147 was always true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

148 definitions: list[core_schema.CoreSchema] = [ 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

149 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail 

150 # Issue: https://github.com/pydantic/pydantic-core/issues/619 

151 core_schema.none_schema(ref=ref, metadata={HAS_INVALID_SCHEMAS_METADATA_KEY: True}) 

152 for ref in expected_missing_refs 

153 ] 

154 return core_schema.definitions_schema(schema, definitions) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

155 return None 

156 

157 

158def collect_invalid_schemas(schema: core_schema.CoreSchema) -> bool: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

159 invalid = False 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

160 

161 def _is_schema_valid(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

162 nonlocal invalid 

163 if 'metadata' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

164 metadata = s['metadata'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

165 if HAS_INVALID_SCHEMAS_METADATA_KEY in metadata: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

166 invalid = metadata[HAS_INVALID_SCHEMAS_METADATA_KEY] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

167 return s 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

168 return recurse(s, _is_schema_valid) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

169 

170 walk_core_schema(schema, _is_schema_valid) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

171 return invalid 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

172 

173 

174T = TypeVar('T') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

175 

176 

177Recurse = Callable[[core_schema.CoreSchema, 'Walk'], core_schema.CoreSchema] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

178Walk = Callable[[core_schema.CoreSchema, Recurse], core_schema.CoreSchema] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

179 

180# TODO: Should we move _WalkCoreSchema into pydantic_core proper? 

181# Issue: https://github.com/pydantic/pydantic-core/issues/615 

182 

183 

184class _WalkCoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

185 def __init__(self): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

186 self._schema_type_to_method = self._build_schema_type_to_method() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

187 

188 def _build_schema_type_to_method(self) -> dict[core_schema.CoreSchemaType, Recurse]: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

189 mapping: dict[core_schema.CoreSchemaType, Recurse] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

190 key: core_schema.CoreSchemaType 

191 for key in get_args(core_schema.CoreSchemaType): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

192 method_name = f"handle_{key.replace('-', '_')}_schema" 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

193 mapping[key] = getattr(self, method_name, self._handle_other_schemas) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

194 return mapping 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

195 

196 def walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

197 return f(schema, self._walk) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

198 

199 def _walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

200 schema = self._schema_type_to_method[schema['type']](schema.copy(), f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

201 ser_schema: core_schema.SerSchema | None = schema.get('serialization') # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

202 if ser_schema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

203 schema['serialization'] = self._handle_ser_schemas(ser_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

204 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

205 

206 def _handle_other_schemas(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

207 sub_schema = schema.get('schema', None) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

208 if sub_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

209 schema['schema'] = self.walk(sub_schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

210 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

211 

212 def _handle_ser_schemas(self, ser_schema: core_schema.SerSchema, f: Walk) -> core_schema.SerSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

213 schema: core_schema.CoreSchema | None = ser_schema.get('schema', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

214 if schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

215 ser_schema['schema'] = self.walk(schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

216 return_schema: core_schema.CoreSchema | None = ser_schema.get('return_schema', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

217 if return_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

218 ser_schema['return_schema'] = self.walk(return_schema, f) # type: ignore 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

219 return ser_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

220 

221 def handle_definitions_schema(self, schema: core_schema.DefinitionsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

222 new_definitions: list[core_schema.CoreSchema] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

223 for definition in schema['definitions']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

224 if 'schema_ref' in definition and 'ref' in definition: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

225 # This indicates a purposely indirect reference 

226 # We want to keep such references around for implications related to JSON schema, etc.: 

227 new_definitions.append(definition) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

228 # However, we still need to walk the referenced definition: 

229 self.walk(definition, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

230 continue 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

231 

232 updated_definition = self.walk(definition, f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

233 if 'ref' in updated_definition: 233 ↛ 223line 233 didn't jump to line 223 because the condition on line 233 was always true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

234 # If the updated definition schema doesn't have a 'ref', it shouldn't go in the definitions 

235 # This is most likely to happen due to replacing something with a definition reference, in 

236 # which case it should certainly not go in the definitions list 

237 new_definitions.append(updated_definition) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

238 new_inner_schema = self.walk(schema['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

239 

240 if not new_definitions and len(schema) == 3: 240 ↛ 242line 240 didn't jump to line 242 because the condition on line 240 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

241 # This means we'd be returning a "trivial" definitions schema that just wrapped the inner schema 

242 return new_inner_schema 

243 

244 new_schema = schema.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

245 new_schema['schema'] = new_inner_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

246 new_schema['definitions'] = new_definitions 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

247 return new_schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

248 

249 def handle_list_schema(self, schema: core_schema.ListSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

250 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

251 if items_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

252 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

253 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

254 

255 def handle_set_schema(self, schema: core_schema.SetSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

256 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

257 if items_schema is not None: 257 ↛ 259line 257 didn't jump to line 259 because the condition on line 257 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

258 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

259 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

260 

261 def handle_frozenset_schema(self, schema: core_schema.FrozenSetSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

262 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

263 if items_schema is not None: 263 ↛ 265line 263 didn't jump to line 265 because the condition on line 263 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

264 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

265 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

266 

267 def handle_generator_schema(self, schema: core_schema.GeneratorSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

268 items_schema = schema.get('items_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

269 if items_schema is not None: 269 ↛ 271line 269 didn't jump to line 271 because the condition on line 269 was always true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

270 schema['items_schema'] = self.walk(items_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

271 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

272 

273 def handle_tuple_schema(self, schema: core_schema.TupleSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

274 schema['items_schema'] = [self.walk(v, f) for v in schema['items_schema']] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

275 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

276 

277 def handle_dict_schema(self, schema: core_schema.DictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

278 keys_schema = schema.get('keys_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

279 if keys_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

280 schema['keys_schema'] = self.walk(keys_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

281 values_schema = schema.get('values_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

282 if values_schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

283 schema['values_schema'] = self.walk(values_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

284 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

285 

286 def handle_function_schema(self, schema: AnyFunctionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

287 if not is_function_with_inner_schema(schema): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

288 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

289 schema['schema'] = self.walk(schema['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

290 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

291 

292 def handle_union_schema(self, schema: core_schema.UnionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

293 new_choices: list[CoreSchema | tuple[CoreSchema, str]] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

294 for v in schema['choices']: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

295 if isinstance(v, tuple): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

296 new_choices.append((self.walk(v[0], f), v[1])) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

297 else: 

298 new_choices.append(self.walk(v, f)) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

299 schema['choices'] = new_choices 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

300 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

301 

302 def handle_tagged_union_schema(self, schema: core_schema.TaggedUnionSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

303 new_choices: dict[Hashable, core_schema.CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

304 for k, v in schema['choices'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

305 new_choices[k] = v if isinstance(v, (str, int)) else self.walk(v, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

306 schema['choices'] = new_choices 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

307 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

308 

309 def handle_chain_schema(self, schema: core_schema.ChainSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

310 schema['steps'] = [self.walk(v, f) for v in schema['steps']] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

311 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

312 

313 def handle_lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

314 schema['lax_schema'] = self.walk(schema['lax_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

315 schema['strict_schema'] = self.walk(schema['strict_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

316 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

317 

318 def handle_json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

319 schema['json_schema'] = self.walk(schema['json_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

320 schema['python_schema'] = self.walk(schema['python_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

321 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

322 

323 def handle_model_fields_schema(self, schema: core_schema.ModelFieldsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

324 extras_schema = schema.get('extras_schema') 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

325 if extras_schema is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

326 schema['extras_schema'] = self.walk(extras_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

327 replaced_fields: dict[str, core_schema.ModelField] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

328 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

329 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

330 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

331 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

332 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

333 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

334 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

335 for k, v in schema['fields'].items(): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

336 replaced_field = v.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

337 replaced_field['schema'] = self.walk(v['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

338 replaced_fields[k] = replaced_field 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

339 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

340 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

341 

342 def handle_typed_dict_schema(self, schema: core_schema.TypedDictSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

343 extras_schema = schema.get('extras_schema') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

344 if extras_schema is not None: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

345 schema['extras_schema'] = self.walk(extras_schema, f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

346 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

347 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

348 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

349 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

350 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

351 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

352 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

353 replaced_fields: dict[str, core_schema.TypedDictField] = {} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

354 for k, v in schema['fields'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

355 replaced_field = v.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

356 replaced_field['schema'] = self.walk(v['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

357 replaced_fields[k] = replaced_field 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

358 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

359 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

360 

361 def handle_dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

362 replaced_fields: list[core_schema.DataclassField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

363 replaced_computed_fields: list[core_schema.ComputedField] = [] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

364 for computed_field in schema.get('computed_fields', ()): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

365 replaced_field = computed_field.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

366 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

367 replaced_computed_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

368 if replaced_computed_fields: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

369 schema['computed_fields'] = replaced_computed_fields 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

370 for field in schema['fields']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

371 replaced_field = field.copy() 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

372 replaced_field['schema'] = self.walk(field['schema'], f) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

373 replaced_fields.append(replaced_field) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

374 schema['fields'] = replaced_fields 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

375 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

376 

377 def handle_arguments_schema(self, schema: core_schema.ArgumentsSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

378 replaced_arguments_schema: list[core_schema.ArgumentsParameter] = [] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

379 for param in schema['arguments_schema']: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

380 replaced_param = param.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

381 replaced_param['schema'] = self.walk(param['schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

382 replaced_arguments_schema.append(replaced_param) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

383 schema['arguments_schema'] = replaced_arguments_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

384 if 'var_args_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

385 schema['var_args_schema'] = self.walk(schema['var_args_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

386 if 'var_kwargs_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

387 schema['var_kwargs_schema'] = self.walk(schema['var_kwargs_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

388 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

389 

390 def handle_call_schema(self, schema: core_schema.CallSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

391 schema['arguments_schema'] = self.walk(schema['arguments_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

392 if 'return_schema' in schema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

393 schema['return_schema'] = self.walk(schema['return_schema'], f) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

394 return schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

395 

396 

397_dispatch = _WalkCoreSchema().walk 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

398 

399 

400def walk_core_schema(schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

401 """Recursively traverse a CoreSchema. 

402 

403 Args: 

404 schema (core_schema.CoreSchema): The CoreSchema to process, it will not be modified. 

405 f (Walk): A function to apply. This function takes two arguments: 

406 1. The current CoreSchema that is being processed 

407 (not the same one you passed into this function, one level down). 

408 2. The "next" `f` to call. This lets you for example use `f=functools.partial(some_method, some_context)` 

409 to pass data down the recursive calls without using globals or other mutable state. 

410 

411 Returns: 

412 core_schema.CoreSchema: A processed CoreSchema. 

413 """ 

414 return f(schema.copy(), _dispatch) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

415 

416 

417def simplify_schema_references(schema: core_schema.CoreSchema) -> core_schema.CoreSchema: # noqa: C901 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

418 definitions: dict[str, core_schema.CoreSchema] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

419 ref_counts: dict[str, int] = defaultdict(int) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

420 involved_in_recursion: dict[str, bool] = {} 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

421 current_recursion_ref_count: dict[str, int] = defaultdict(int) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

422 

423 def collect_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

424 if s['type'] == 'definitions': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

425 for definition in s['definitions']: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

426 ref = get_ref(definition) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

427 assert ref is not None 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

428 if ref not in definitions: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

429 definitions[ref] = definition 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

430 recurse(definition, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

431 return recurse(s['schema'], collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

432 else: 

433 ref = get_ref(s) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

434 if ref is not None: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

435 new = recurse(s, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

436 new_ref = get_ref(new) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

437 if new_ref: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

438 definitions[new_ref] = new 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

439 return core_schema.definition_reference_schema(schema_ref=ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

440 else: 

441 return recurse(s, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

442 

443 schema = walk_core_schema(schema, collect_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

444 

445 def count_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

446 if s['type'] != 'definition-ref': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

447 return recurse(s, count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

448 ref = s['schema_ref'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

449 ref_counts[ref] += 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

450 

451 if ref_counts[ref] >= 2: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

452 # If this model is involved in a recursion this should be detected 

453 # on its second encounter, we can safely stop the walk here. 

454 if current_recursion_ref_count[ref] != 0: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

455 involved_in_recursion[ref] = True 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

456 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

457 

458 current_recursion_ref_count[ref] += 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

459 recurse(definitions[ref], count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

460 current_recursion_ref_count[ref] -= 1 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

461 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

462 

463 schema = walk_core_schema(schema, count_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

464 

465 assert all(c == 0 for c in current_recursion_ref_count.values()), 'this is a bug! please report it' 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

466 

467 def can_be_inlined(s: core_schema.DefinitionReferenceSchema, ref: str) -> bool: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

468 if ref_counts[ref] > 1: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

469 return False 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

470 if involved_in_recursion.get(ref, False): 470 ↛ 471line 470 didn't jump to line 471 because the condition on line 470 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

471 return False 

472 if 'serialization' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

473 return False 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

474 if 'metadata' in s: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

475 metadata = s['metadata'] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

476 for k in ( 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

477 'pydantic_js_functions', 

478 'pydantic_js_annotation_functions', 

479 'pydantic.internal.union_discriminator', 

480 ): 

481 if k in metadata: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

482 # we need to keep this as a ref 

483 return False 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

484 return True 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

485 

486 def inline_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

487 if s['type'] == 'definition-ref': 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

488 ref = s['schema_ref'] 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

489 # Check if the reference is only used once, not involved in recursion and does not have 

490 # any extra keys (like 'serialization') 

491 if can_be_inlined(s, ref): 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

492 # Inline the reference by replacing the reference with the actual schema 

493 new = definitions.pop(ref) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

494 ref_counts[ref] -= 1 # because we just replaced it! 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

495 # put all other keys that were on the def-ref schema into the inlined version 

496 # in particular this is needed for `serialization` 

497 if 'serialization' in s: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

498 new['serialization'] = s['serialization'] 

499 s = recurse(new, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

500 return s 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

501 else: 

502 return recurse(s, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

503 else: 

504 return recurse(s, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

505 

506 schema = walk_core_schema(schema, inline_refs) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

507 

508 def_values = [v for v in definitions.values() if ref_counts[v['ref']] > 0] # type: ignore 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

509 

510 if def_values: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

511 schema = core_schema.definitions_schema(schema=schema, definitions=def_values) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

512 return schema 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

513 

514 

515def _strip_metadata(schema: CoreSchema) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

516 def strip_metadata(s: CoreSchema, recurse: Recurse) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

517 s = s.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

518 s.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

519 if s['type'] == 'model-fields': 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

520 s = s.copy() 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

521 s['fields'] = {k: v.copy() for k, v in s['fields'].items()} 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

522 for field_name, field_schema in s['fields'].items(): 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

523 field_schema.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

524 s['fields'][field_name] = field_schema 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

525 computed_fields = s.get('computed_fields', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

526 if computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

527 s['computed_fields'] = [cf.copy() for cf in computed_fields] 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

528 for cf in computed_fields: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

529 cf.pop('metadata', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

530 else: 

531 s.pop('computed_fields', None) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

532 elif s['type'] == 'model': 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

533 # remove some defaults 

534 if s.get('custom_init', True) is False: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

535 s.pop('custom_init') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

536 if s.get('root_model', True) is False: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

537 s.pop('root_model') 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

538 if {'title'}.issuperset(s.get('config', {}).keys()): 538 ↛ 539line 538 didn't jump to line 539 because the condition on line 538 was never true1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

539 s.pop('config', None) 

540 

541 return recurse(s, strip_metadata) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

542 

543 return walk_core_schema(schema, strip_metadata) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

544 

545 

546def pretty_print_core_schema( 1zABCabcdefghDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

547 schema: CoreSchema, 

548 include_metadata: bool = False, 

549) -> None: 

550 """Pretty print a CoreSchema using rich. 

551 This is intended for debugging purposes. 

552 

553 Args: 

554 schema: The CoreSchema to print. 

555 include_metadata: Whether to include metadata in the output. Defaults to `False`. 

556 """ 

557 from rich import print # type: ignore # install it manually in your dev env 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

558 

559 if not include_metadata: 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

560 schema = _strip_metadata(schema) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

561 

562 return print(schema) 1zABCabcdefghLyDEFGijklmnopHIJKqrstuvwx

563 

564 

565def validate_core_schema(schema: CoreSchema) -> CoreSchema: 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

566 if 'PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS' in os.environ: 566 ↛ 567line 566 didn't jump to line 567 because the condition on line 566 was never true1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx

567 return schema 

568 return _validate_core_schema(schema) 1zABCabcdefghLyDEFGijklmnopUVMNOPQRSTHIJKqrstuvwx