Coverage for pydantic/_internal/_core_utils.py: 96.49%

367 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2024-06-21 17:00 +0000

1from __future__ import annotations 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

2 

3import os 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

4from collections import defaultdict 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

5from typing import ( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

6 Any, 

7 Callable, 

8 Hashable, 

9 TypeVar, 

10 Union, 

11) 

12 

13from pydantic_core import CoreSchema, core_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

14from pydantic_core import validate_core_schema as _validate_core_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

15from typing_extensions import TypeAliasType, TypeGuard, get_args, get_origin 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

16 

17from . import _repr 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

18from ._typing_extra import is_generic_alias 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

19 

20AnyFunctionSchema = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

21 core_schema.AfterValidatorFunctionSchema, 

22 core_schema.BeforeValidatorFunctionSchema, 

23 core_schema.WrapValidatorFunctionSchema, 

24 core_schema.PlainValidatorFunctionSchema, 

25] 

26 

27 

28FunctionSchemaWithInnerSchema = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

29 core_schema.AfterValidatorFunctionSchema, 

30 core_schema.BeforeValidatorFunctionSchema, 

31 core_schema.WrapValidatorFunctionSchema, 

32] 

33 

34CoreSchemaField = Union[ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

35 core_schema.ModelField, core_schema.DataclassField, core_schema.TypedDictField, core_schema.ComputedField 

36] 

37CoreSchemaOrField = Union[core_schema.CoreSchema, CoreSchemaField] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

38 

39_CORE_SCHEMA_FIELD_TYPES = {'typed-dict-field', 'dataclass-field', 'model-field', 'computed-field'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

40_FUNCTION_WITH_INNER_SCHEMA_TYPES = {'function-before', 'function-after', 'function-wrap'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

41_LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES = {'list', 'set', 'frozenset'} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

42 

43TAGGED_UNION_TAG_KEY = 'pydantic.internal.tagged_union_tag' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

44""" 1abcdefsghijklGHIJKLMmnopqr

45Used in a `Tag` schema to specify the tag used for a discriminated union. 

46""" 

47HAS_INVALID_SCHEMAS_METADATA_KEY = 'pydantic.internal.invalid' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

48"""Used to mark a schema that is invalid because it refers to a definition that was not yet defined when the 1abcdefsghijklGHIJKLMmnopqr

49schema was first encountered. 

50""" 

51 

52 

53def is_core_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

54 schema: CoreSchemaOrField, 

55) -> TypeGuard[CoreSchema]: 

56 return schema['type'] not in _CORE_SCHEMA_FIELD_TYPES 1tuvwabcdefFsxyzAghijklBCDEmnopqr

57 

58 

59def is_core_schema_field( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

60 schema: CoreSchemaOrField, 

61) -> TypeGuard[CoreSchemaField]: 

62 return schema['type'] in _CORE_SCHEMA_FIELD_TYPES 1tuvwabcdefFsxyzAghijklBCDEmnopqr

63 

64 

65def is_function_with_inner_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

66 schema: CoreSchemaOrField, 

67) -> TypeGuard[FunctionSchemaWithInnerSchema]: 

68 return schema['type'] in _FUNCTION_WITH_INNER_SCHEMA_TYPES 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

69 

70 

71def is_list_like_schema_with_items_schema( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

72 schema: CoreSchema, 

73) -> TypeGuard[core_schema.ListSchema | core_schema.SetSchema | core_schema.FrozenSetSchema]: 

74 return schema['type'] in _LIST_LIKE_SCHEMA_WITH_ITEMS_TYPES 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

75 

76 

77def get_type_ref(type_: type[Any], args_override: tuple[type[Any], ...] | None = None) -> str: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

78 """Produces the ref to be used for this type by pydantic_core's core schemas. 

79 

80 This `args_override` argument was added for the purpose of creating valid recursive references 

81 when creating generic models without needing to create a concrete class. 

82 """ 

83 origin = get_origin(type_) or type_ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

84 

85 args = get_args(type_) if is_generic_alias(type_) else (args_override or ()) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

86 generic_metadata = getattr(type_, '__pydantic_generic_metadata__', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

87 if generic_metadata: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

88 origin = generic_metadata['origin'] or origin 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

89 args = generic_metadata['args'] or args 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

90 

91 module_name = getattr(origin, '__module__', '<No __module__>') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

92 if isinstance(origin, TypeAliasType): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

93 type_ref = f'{module_name}.{origin.__name__}:{id(origin)}' 1tuvwabcdefFsxyzAghijklBCDEmnopqr

94 else: 

95 try: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

96 qualname = getattr(origin, '__qualname__', f'<No __qualname__: {origin}>') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

97 except Exception: 

98 qualname = getattr(origin, '__qualname__', '<No __qualname__>') 

99 type_ref = f'{module_name}.{qualname}:{id(origin)}' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

100 

101 arg_refs: list[str] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

102 for arg in args: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

103 if isinstance(arg, str): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

104 # Handle string literals as a special case; we may be able to remove this special handling if we 

105 # wrap them in a ForwardRef at some point. 

106 arg_ref = f'{arg}:str-{id(arg)}' 1tuvwabcdefFsxyzAghijklBCDEmnopqr

107 else: 

108 arg_ref = f'{_repr.display_as_type(arg)}:{id(arg)}' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

109 arg_refs.append(arg_ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

110 if arg_refs: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

111 type_ref = f'{type_ref}[{",".join(arg_refs)}]' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

112 return type_ref 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

113 

114 

115def get_ref(s: core_schema.CoreSchema) -> None | str: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

116 """Get the ref from the schema if it has one. 

117 This exists just for type checking to work correctly. 

118 """ 

119 return s.get('ref', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

120 

121 

122def collect_definitions(schema: core_schema.CoreSchema) -> dict[str, core_schema.CoreSchema]: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

123 defs: dict[str, CoreSchema] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

124 

125 def _record_valid_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

126 ref = get_ref(s) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

127 if ref: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

128 defs[ref] = s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

129 return recurse(s, _record_valid_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

130 

131 walk_core_schema(schema, _record_valid_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

132 

133 return defs 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

134 

135 

136def define_expected_missing_refs( 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

137 schema: core_schema.CoreSchema, allowed_missing_refs: set[str] 

138) -> core_schema.CoreSchema | None: 

139 if not allowed_missing_refs: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

140 # in this case, there are no missing refs to potentially substitute, so there's no need to walk the schema 

141 # this is a common case (will be hit for all non-generic models), so it's worth optimizing for 

142 return None 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

143 

144 refs = collect_definitions(schema).keys() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

145 

146 expected_missing_refs = allowed_missing_refs.difference(refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

147 if expected_missing_refs: 147 ↛ 155line 147 didn't jump to line 155, because the condition on line 147 was always true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

148 definitions: list[core_schema.CoreSchema] = [ 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

149 # TODO: Replace this with a (new) CoreSchema that, if present at any level, makes validation fail 

150 # Issue: https://github.com/pydantic/pydantic-core/issues/619 

151 core_schema.none_schema(ref=ref, metadata={HAS_INVALID_SCHEMAS_METADATA_KEY: True}) 

152 for ref in expected_missing_refs 

153 ] 

154 return core_schema.definitions_schema(schema, definitions) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

155 return None 

156 

157 

158def collect_invalid_schemas(schema: core_schema.CoreSchema) -> bool: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

159 invalid = False 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

160 

161 def _is_schema_valid(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

162 nonlocal invalid 

163 if 'metadata' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

164 metadata = s['metadata'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

165 if HAS_INVALID_SCHEMAS_METADATA_KEY in metadata: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

166 invalid = metadata[HAS_INVALID_SCHEMAS_METADATA_KEY] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

167 return s 1tuvwabcdefFsxyzAghijklBCDEmnopqr

168 return recurse(s, _is_schema_valid) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

169 

170 walk_core_schema(schema, _is_schema_valid) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

171 return invalid 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

172 

173 

174T = TypeVar('T') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

175 

176 

177Recurse = Callable[[core_schema.CoreSchema, 'Walk'], core_schema.CoreSchema] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

178Walk = Callable[[core_schema.CoreSchema, Recurse], core_schema.CoreSchema] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

179 

180# TODO: Should we move _WalkCoreSchema into pydantic_core proper? 

181# Issue: https://github.com/pydantic/pydantic-core/issues/615 

182 

183 

184class _WalkCoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

185 def __init__(self): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

186 self._schema_type_to_method = self._build_schema_type_to_method() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

187 

188 def _build_schema_type_to_method(self) -> dict[core_schema.CoreSchemaType, Recurse]: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

189 mapping: dict[core_schema.CoreSchemaType, Recurse] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

190 key: core_schema.CoreSchemaType 

191 for key in get_args(core_schema.CoreSchemaType): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

192 method_name = f"handle_{key.replace('-', '_')}_schema" 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

193 mapping[key] = getattr(self, method_name, self._handle_other_schemas) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

194 return mapping 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

195 

196 def walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

197 return f(schema, self._walk) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

198 

199 def _walk(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

200 schema = self._schema_type_to_method[schema['type']](schema.copy(), f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

201 ser_schema: core_schema.SerSchema | None = schema.get('serialization') # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

202 if ser_schema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

203 schema['serialization'] = self._handle_ser_schemas(ser_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

204 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

205 

206 def _handle_other_schemas(self, schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

207 sub_schema = schema.get('schema', None) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

208 if sub_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

209 schema['schema'] = self.walk(sub_schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

210 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

211 

212 def _handle_ser_schemas(self, ser_schema: core_schema.SerSchema, f: Walk) -> core_schema.SerSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

213 schema: core_schema.CoreSchema | None = ser_schema.get('schema', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

214 if schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

215 ser_schema['schema'] = self.walk(schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklBCDEmnopqr

216 return_schema: core_schema.CoreSchema | None = ser_schema.get('return_schema', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

217 if return_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

218 ser_schema['return_schema'] = self.walk(return_schema, f) # type: ignore 1tuvwabcdefFsxyzAghijklBCDEmnopqr

219 return ser_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

220 

221 def handle_definitions_schema(self, schema: core_schema.DefinitionsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

222 new_definitions: list[core_schema.CoreSchema] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

223 for definition in schema['definitions']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

224 if 'schema_ref' in definition and 'ref' in definition: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

225 # This indicates a purposely indirect reference 

226 # We want to keep such references around for implications related to JSON schema, etc.: 

227 new_definitions.append(definition) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

228 # However, we still need to walk the referenced definition: 

229 self.walk(definition, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

230 continue 1tuvwabcdefFsxyzAghijklBCDEmnopqr

231 

232 updated_definition = self.walk(definition, f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

233 if 'ref' in updated_definition: 233 ↛ 223line 233 didn't jump to line 223, because the condition on line 233 was always true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

234 # If the updated definition schema doesn't have a 'ref', it shouldn't go in the definitions 

235 # This is most likely to happen due to replacing something with a definition reference, in 

236 # which case it should certainly not go in the definitions list 

237 new_definitions.append(updated_definition) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

238 new_inner_schema = self.walk(schema['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

239 

240 if not new_definitions and len(schema) == 3: 240 ↛ 242line 240 didn't jump to line 242, because the condition on line 240 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

241 # This means we'd be returning a "trivial" definitions schema that just wrapped the inner schema 

242 return new_inner_schema 

243 

244 new_schema = schema.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

245 new_schema['schema'] = new_inner_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

246 new_schema['definitions'] = new_definitions 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

247 return new_schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

248 

249 def handle_list_schema(self, schema: core_schema.ListSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

250 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

251 if items_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

252 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

253 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

254 

255 def handle_set_schema(self, schema: core_schema.SetSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

256 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

257 if items_schema is not None: 257 ↛ 259line 257 didn't jump to line 259, because the condition on line 257 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr

258 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

259 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

260 

261 def handle_frozenset_schema(self, schema: core_schema.FrozenSetSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

262 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

263 if items_schema is not None: 263 ↛ 265line 263 didn't jump to line 265, because the condition on line 263 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr

264 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

265 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

266 

267 def handle_generator_schema(self, schema: core_schema.GeneratorSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

268 items_schema = schema.get('items_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

269 if items_schema is not None: 269 ↛ 271line 269 didn't jump to line 271, because the condition on line 269 was always true1tuvwabcdefFsxyzAghijklBCDEmnopqr

270 schema['items_schema'] = self.walk(items_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

271 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

272 

273 def handle_tuple_schema(self, schema: core_schema.TupleSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

274 schema['items_schema'] = [self.walk(v, f) for v in schema['items_schema']] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

275 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

276 

277 def handle_dict_schema(self, schema: core_schema.DictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

278 keys_schema = schema.get('keys_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

279 if keys_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

280 schema['keys_schema'] = self.walk(keys_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

281 values_schema = schema.get('values_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

282 if values_schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

283 schema['values_schema'] = self.walk(values_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

284 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

285 

286 def handle_function_schema(self, schema: AnyFunctionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

287 if not is_function_with_inner_schema(schema): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

288 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

289 schema['schema'] = self.walk(schema['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

290 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

291 

292 def handle_union_schema(self, schema: core_schema.UnionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

293 new_choices: list[CoreSchema | tuple[CoreSchema, str]] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

294 for v in schema['choices']: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

295 if isinstance(v, tuple): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

296 new_choices.append((self.walk(v[0], f), v[1])) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

297 else: 

298 new_choices.append(self.walk(v, f)) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

299 schema['choices'] = new_choices 1tuvwabcdefFsxyzAghijklBCDEmnopqr

300 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

301 

302 def handle_tagged_union_schema(self, schema: core_schema.TaggedUnionSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

303 new_choices: dict[Hashable, core_schema.CoreSchema] = {} 1tuvwabcdefFsxyzAghijklBCDEmnopqr

304 for k, v in schema['choices'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

305 new_choices[k] = v if isinstance(v, (str, int)) else self.walk(v, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

306 schema['choices'] = new_choices 1tuvwabcdefFsxyzAghijklBCDEmnopqr

307 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

308 

309 def handle_chain_schema(self, schema: core_schema.ChainSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

310 schema['steps'] = [self.walk(v, f) for v in schema['steps']] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

311 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

312 

313 def handle_lax_or_strict_schema(self, schema: core_schema.LaxOrStrictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

314 schema['lax_schema'] = self.walk(schema['lax_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

315 schema['strict_schema'] = self.walk(schema['strict_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

316 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

317 

318 def handle_json_or_python_schema(self, schema: core_schema.JsonOrPythonSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

319 schema['json_schema'] = self.walk(schema['json_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

320 schema['python_schema'] = self.walk(schema['python_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

321 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

322 

323 def handle_model_fields_schema(self, schema: core_schema.ModelFieldsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

324 extras_schema = schema.get('extras_schema') 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

325 if extras_schema is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

326 schema['extras_schema'] = self.walk(extras_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

327 replaced_fields: dict[str, core_schema.ModelField] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

328 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

329 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

330 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

331 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

332 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

333 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

334 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr

335 for k, v in schema['fields'].items(): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

336 replaced_field = v.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

337 replaced_field['schema'] = self.walk(v['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

338 replaced_fields[k] = replaced_field 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

339 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

340 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

341 

342 def handle_typed_dict_schema(self, schema: core_schema.TypedDictSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

343 extras_schema = schema.get('extras_schema') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

344 if extras_schema is not None: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

345 schema['extras_schema'] = self.walk(extras_schema, f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

346 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

347 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

348 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

349 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

350 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

351 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

352 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr

353 replaced_fields: dict[str, core_schema.TypedDictField] = {} 1tuvwabcdefFsxyzAghijklBCDEmnopqr

354 for k, v in schema['fields'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

355 replaced_field = v.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

356 replaced_field['schema'] = self.walk(v['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

357 replaced_fields[k] = replaced_field 1tuvwabcdefFsxyzAghijklBCDEmnopqr

358 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr

359 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

360 

361 def handle_dataclass_args_schema(self, schema: core_schema.DataclassArgsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

362 replaced_fields: list[core_schema.DataclassField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

363 replaced_computed_fields: list[core_schema.ComputedField] = [] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

364 for computed_field in schema.get('computed_fields', ()): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

365 replaced_field = computed_field.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

366 replaced_field['return_schema'] = self.walk(computed_field['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

367 replaced_computed_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

368 if replaced_computed_fields: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

369 schema['computed_fields'] = replaced_computed_fields 1tuvwabcdefFsxyzAghijklBCDEmnopqr

370 for field in schema['fields']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

371 replaced_field = field.copy() 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

372 replaced_field['schema'] = self.walk(field['schema'], f) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

373 replaced_fields.append(replaced_field) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

374 schema['fields'] = replaced_fields 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

375 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

376 

377 def handle_arguments_schema(self, schema: core_schema.ArgumentsSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

378 replaced_arguments_schema: list[core_schema.ArgumentsParameter] = [] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

379 for param in schema['arguments_schema']: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

380 replaced_param = param.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

381 replaced_param['schema'] = self.walk(param['schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

382 replaced_arguments_schema.append(replaced_param) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

383 schema['arguments_schema'] = replaced_arguments_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

384 if 'var_args_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

385 schema['var_args_schema'] = self.walk(schema['var_args_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

386 if 'var_kwargs_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

387 schema['var_kwargs_schema'] = self.walk(schema['var_kwargs_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

388 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

389 

390 def handle_call_schema(self, schema: core_schema.CallSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

391 schema['arguments_schema'] = self.walk(schema['arguments_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

392 if 'return_schema' in schema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

393 schema['return_schema'] = self.walk(schema['return_schema'], f) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

394 return schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

395 

396 

397_dispatch = _WalkCoreSchema().walk 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

398 

399 

400def walk_core_schema(schema: core_schema.CoreSchema, f: Walk) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

401 """Recursively traverse a CoreSchema. 

402 

403 Args: 

404 schema (core_schema.CoreSchema): The CoreSchema to process, it will not be modified. 

405 f (Walk): A function to apply. This function takes two arguments: 

406 1. The current CoreSchema that is being processed 

407 (not the same one you passed into this function, one level down). 

408 2. The "next" `f` to call. This lets you for example use `f=functools.partial(some_method, some_context)` 

409 to pass data down the recursive calls without using globals or other mutable state. 

410 

411 Returns: 

412 core_schema.CoreSchema: A processed CoreSchema. 

413 """ 

414 return f(schema.copy(), _dispatch) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

415 

416 

417def simplify_schema_references(schema: core_schema.CoreSchema) -> core_schema.CoreSchema: # noqa: C901 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

418 definitions: dict[str, core_schema.CoreSchema] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

419 ref_counts: dict[str, int] = defaultdict(int) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

420 involved_in_recursion: dict[str, bool] = {} 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

421 current_recursion_ref_count: dict[str, int] = defaultdict(int) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

422 

423 def collect_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

424 if s['type'] == 'definitions': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

425 for definition in s['definitions']: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

426 ref = get_ref(definition) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

427 assert ref is not None 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

428 if ref not in definitions: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

429 definitions[ref] = definition 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

430 recurse(definition, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

431 return recurse(s['schema'], collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

432 else: 

433 ref = get_ref(s) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

434 if ref is not None: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

435 new = recurse(s, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

436 new_ref = get_ref(new) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

437 if new_ref: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

438 definitions[new_ref] = new 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

439 return core_schema.definition_reference_schema(schema_ref=ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

440 else: 

441 return recurse(s, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

442 

443 schema = walk_core_schema(schema, collect_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

444 

445 def count_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

446 if s['type'] != 'definition-ref': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

447 return recurse(s, count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

448 ref = s['schema_ref'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

449 ref_counts[ref] += 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

450 

451 if ref_counts[ref] >= 2: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

452 # If this model is involved in a recursion this should be detected 

453 # on its second encounter, we can safely stop the walk here. 

454 if current_recursion_ref_count[ref] != 0: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

455 involved_in_recursion[ref] = True 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

456 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

457 

458 current_recursion_ref_count[ref] += 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

459 recurse(definitions[ref], count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

460 current_recursion_ref_count[ref] -= 1 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

461 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

462 

463 schema = walk_core_schema(schema, count_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

464 

465 assert all(c == 0 for c in current_recursion_ref_count.values()), 'this is a bug! please report it' 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

466 

467 def can_be_inlined(s: core_schema.DefinitionReferenceSchema, ref: str) -> bool: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

468 if ref_counts[ref] > 1: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

469 return False 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

470 if involved_in_recursion.get(ref, False): 470 ↛ 471line 470 didn't jump to line 471, because the condition on line 470 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

471 return False 

472 if 'serialization' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

473 return False 1tuvwabcdefFsxyzAghijklBCDEmnopqr

474 if 'metadata' in s: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

475 metadata = s['metadata'] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

476 for k in ( 1tuvwabcdefFsxyzAghijklBCDEmnopqr

477 'pydantic_js_functions', 

478 'pydantic_js_annotation_functions', 

479 'pydantic.internal.union_discriminator', 

480 ): 

481 if k in metadata: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

482 # we need to keep this as a ref 

483 return False 1tuvwabcdefFsxyzAghijklBCDEmnopqr

484 return True 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

485 

486 def inline_refs(s: core_schema.CoreSchema, recurse: Recurse) -> core_schema.CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

487 if s['type'] == 'definition-ref': 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

488 ref = s['schema_ref'] 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

489 # Check if the reference is only used once, not involved in recursion and does not have 

490 # any extra keys (like 'serialization') 

491 if can_be_inlined(s, ref): 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

492 # Inline the reference by replacing the reference with the actual schema 

493 new = definitions.pop(ref) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

494 ref_counts[ref] -= 1 # because we just replaced it! 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

495 # put all other keys that were on the def-ref schema into the inlined version 

496 # in particular this is needed for `serialization` 

497 if 'serialization' in s: 497 ↛ 498line 497 didn't jump to line 498, because the condition on line 497 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

498 new['serialization'] = s['serialization'] 

499 s = recurse(new, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

500 return s 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

501 else: 

502 return recurse(s, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

503 else: 

504 return recurse(s, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

505 

506 schema = walk_core_schema(schema, inline_refs) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

507 

508 def_values = [v for v in definitions.values() if ref_counts[v['ref']] > 0] # type: ignore 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

509 

510 if def_values: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

511 schema = core_schema.definitions_schema(schema=schema, definitions=def_values) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

512 return schema 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

513 

514 

515def _strip_metadata(schema: CoreSchema) -> CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

516 def strip_metadata(s: CoreSchema, recurse: Recurse) -> CoreSchema: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

517 s = s.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

518 s.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

519 if s['type'] == 'model-fields': 1tuvwabcdefFsxyzAghijklBCDEmnopqr

520 s = s.copy() 1tuvwabcdefFsxyzAghijklBCDEmnopqr

521 s['fields'] = {k: v.copy() for k, v in s['fields'].items()} 1tuvwabcdefFsxyzAghijklBCDEmnopqr

522 for field_name, field_schema in s['fields'].items(): 1tuvwabcdefFsxyzAghijklBCDEmnopqr

523 field_schema.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

524 s['fields'][field_name] = field_schema 1tuvwabcdefFsxyzAghijklBCDEmnopqr

525 computed_fields = s.get('computed_fields', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

526 if computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

527 s['computed_fields'] = [cf.copy() for cf in computed_fields] 1tuvwabcdefFsxyzAghijklBCDEmnopqr

528 for cf in computed_fields: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

529 cf.pop('metadata', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

530 else: 

531 s.pop('computed_fields', None) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

532 elif s['type'] == 'model': 1tuvwabcdefFsxyzAghijklBCDEmnopqr

533 # remove some defaults 

534 if s.get('custom_init', True) is False: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

535 s.pop('custom_init') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

536 if s.get('root_model', True) is False: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

537 s.pop('root_model') 1tuvwabcdefFsxyzAghijklBCDEmnopqr

538 if {'title'}.issuperset(s.get('config', {}).keys()): 538 ↛ 539line 538 didn't jump to line 539, because the condition on line 538 was never true1tuvwabcdefFsxyzAghijklBCDEmnopqr

539 s.pop('config', None) 

540 

541 return recurse(s, strip_metadata) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

542 

543 return walk_core_schema(schema, strip_metadata) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

544 

545 

546def pretty_print_core_schema( 1tuvwabcdefxyzAghijklNOGHIJKLMBCDEmnopqr

547 schema: CoreSchema, 

548 include_metadata: bool = False, 

549) -> None: 

550 """Pretty print a CoreSchema using rich. 

551 This is intended for debugging purposes. 

552 

553 Args: 

554 schema: The CoreSchema to print. 

555 include_metadata: Whether to include metadata in the output. Defaults to `False`. 

556 """ 

557 from rich import print # type: ignore # install it manually in your dev env 1tuvwabcdefFsxyzAghijklBCDEmnopqr

558 

559 if not include_metadata: 1tuvwabcdefFsxyzAghijklBCDEmnopqr

560 schema = _strip_metadata(schema) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

561 

562 return print(schema) 1tuvwabcdefFsxyzAghijklBCDEmnopqr

563 

564 

565def validate_core_schema(schema: CoreSchema) -> CoreSchema: 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

566 if 'PYDANTIC_SKIP_VALIDATING_CORE_SCHEMAS' in os.environ: 566 ↛ 567line 566 didn't jump to line 567, because the condition on line 566 was never true1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr

567 return schema 

568 return _validate_core_schema(schema) 1tuvwabcdefFsxyzAghijklNOGHIJKLMBCDEmnopqr