Coverage for pydantic/_internal/_generics.py: 93.44%

226 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-13 19:35 +0000

1from __future__ import annotations 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

2 

3import sys 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

4import types 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

5import typing 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

6from collections import ChainMap 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

7from collections.abc import Iterator, Mapping 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

8from contextlib import contextmanager 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

9from contextvars import ContextVar 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

10from itertools import zip_longest 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

11from types import prepare_class 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

12from typing import TYPE_CHECKING, Any, TypeVar 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

13from weakref import WeakValueDictionary 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

14 

15import typing_extensions 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

16 

17from . import _typing_extra 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

18from ._core_utils import get_type_ref 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

19from ._forward_ref import PydanticRecursiveRef 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

20from ._utils import all_identical, is_model_class 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

21 

22if sys.version_info >= (3, 10): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

23 from typing import _UnionGenericAlias # type: ignore[attr-defined] 1abcdefzAghijklmCDGHIJKLnopqrsEF

24 

25if TYPE_CHECKING: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

26 from ..main import BaseModel 

27 

28GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

29 

30# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

31# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

32# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

33# while also retaining a limited number of types even without references. This is generally enough to build 

34# specific recursive generic models without losing required items out of the cache. 

35 

36KT = TypeVar('KT') 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

37VT = TypeVar('VT') 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

38_LIMITED_DICT_SIZE = 100 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

39 

40 

41class LimitedDict(dict[KT, VT]): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

42 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

43 self.size_limit = size_limit 1tuabcdefzABgvwhijklmCDxynopqrsEF

44 super().__init__() 1tuabcdefzABgvwhijklmCDxynopqrsEF

45 

46 def __setitem__(self, key: KT, value: VT, /) -> None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

47 super().__setitem__(key, value) 1tuabcdefzABgvwhijklmCDxynopqrsEF

48 if len(self) > self.size_limit: 1tuabcdefzABgvwhijklmCDxynopqrsEF

49 excess = len(self) - self.size_limit + self.size_limit // 10 1tuabcdefzABgvwhijklmCDxynopqrsEF

50 to_remove = list(self.keys())[:excess] 1tuabcdefzABgvwhijklmCDxynopqrsEF

51 for k in to_remove: 1tuabcdefzABgvwhijklmCDxynopqrsEF

52 del self[k] 1tuabcdefzABgvwhijklmCDxynopqrsEF

53 

54 

55# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

56# once they are no longer referenced by the caller. 

57GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

58 

59if TYPE_CHECKING: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

60 

61 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

62 ... 

63 

64else: 

65 

66 class DeepChainMap(ChainMap): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

67 """Variant of ChainMap that allows direct updates to inner scopes. 

68 

69 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

70 with some light modifications for this use case. 

71 """ 

72 

73 def clear(self) -> None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

74 for mapping in self.maps: 

75 mapping.clear() 

76 

77 def __setitem__(self, key: KT, value: VT) -> None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

78 for mapping in self.maps: 

79 mapping[key] = value 

80 

81 def __delitem__(self, key: KT) -> None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

82 hit = False 

83 for mapping in self.maps: 

84 if key in mapping: 

85 del mapping[key] 

86 hit = True 

87 if not hit: 

88 raise KeyError(key) 

89 

90 

91# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

92# and discover later on that we need to re-add all this infrastructure... 

93# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

94 

95_GENERIC_TYPES_CACHE = GenericTypesCache() 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

96 

97 

98class PydanticGenericMetadata(typing_extensions.TypedDict): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

99 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

100 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

101 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

102 

103 

104def create_generic_submodel( 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

105 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

106) -> type[BaseModel]: 

107 """Dynamically create a submodel of a provided (generic) BaseModel. 

108 

109 This is used when producing concrete parametrizations of generic models. This function 

110 only *creates* the new subclass; the schema/validators/serialization must be updated to 

111 reflect a concrete parametrization elsewhere. 

112 

113 Args: 

114 model_name: The name of the newly created model. 

115 origin: The base class for the new model to inherit from. 

116 args: A tuple of generic metadata arguments. 

117 params: A tuple of generic metadata parameters. 

118 

119 Returns: 

120 The created submodel. 

121 """ 

122 namespace: dict[str, Any] = {'__module__': origin.__module__} 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

123 bases = (origin,) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

124 meta, ns, kwds = prepare_class(model_name, bases) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

125 namespace.update(ns) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

126 created_model = meta( 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

127 model_name, 

128 bases, 

129 namespace, 

130 __pydantic_generic_metadata__={ 

131 'origin': origin, 

132 'args': args, 

133 'parameters': params, 

134 }, 

135 __pydantic_reset_parent_namespace__=False, 

136 **kwds, 

137 ) 

138 

139 model_module, called_globally = _get_caller_frame_info(depth=3) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

140 if called_globally: # create global reference and therefore allow pickling 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

141 object_by_reference = None 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

142 reference_name = model_name 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

143 reference_module_globals = sys.modules[created_model.__module__].__dict__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

144 while object_by_reference is not created_model: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

145 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

146 reference_name += '_' 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

147 

148 return created_model 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

149 

150 

151def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

152 """Used inside a function to check whether it was called globally. 

153 

154 Args: 

155 depth: The depth to get the frame. 

156 

157 Returns: 

158 A tuple contains `module_name` and `called_globally`. 

159 

160 Raises: 

161 RuntimeError: If the function is not called inside a function. 

162 """ 

163 try: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

164 previous_caller_frame = sys._getframe(depth) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

165 except ValueError as e: 1tuabcdefzABgvwhijklmCDxynopqrsEF

166 raise RuntimeError('This function must be used inside another function') from e 1tuabcdefzABgvwhijklmCDxynopqrsEF

167 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 1tuabcdefzABgvwhijklmCDxynopqrsEF

168 return None, False 1tuabcdefzABgvwhijklmCDxynopqrsEF

169 frame_globals = previous_caller_frame.f_globals 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

170 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

171 

172 

173DictValues: type[Any] = {}.values().__class__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

174 

175 

176def iter_contained_typevars(v: Any) -> Iterator[TypeVar]: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

177 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

178 

179 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

180 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

181 """ 

182 if isinstance(v, TypeVar): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

183 yield v 1tuabcdefzABgvwhijklmCDxynopqrsEF

184 elif is_model_class(v): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

185 yield from v.__pydantic_generic_metadata__['parameters'] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

186 elif isinstance(v, (DictValues, list)): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

187 for var in v: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

188 yield from iter_contained_typevars(var) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

189 else: 

190 args = get_args(v) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

191 for arg in args: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

192 yield from iter_contained_typevars(arg) 1tuabcdefzABgvwhijklmCDxynopqrsEF

193 

194 

195def get_args(v: Any) -> Any: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

196 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

197 if pydantic_generic_metadata: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

198 return pydantic_generic_metadata.get('args') 1tuabcdefzABgvwhijklmCDxynopqrsEF

199 return typing_extensions.get_args(v) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

200 

201 

202def get_origin(v: Any) -> Any: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

203 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

204 if pydantic_generic_metadata: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

205 return pydantic_generic_metadata.get('origin') 1tuabcdefzABgvwhijklmCDxynopqrsEF

206 return typing_extensions.get_origin(v) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

207 

208 

209def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

210 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

211 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

212 """ 

213 origin = get_origin(cls) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

214 if origin is None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

215 return None 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

216 if not hasattr(origin, '__parameters__'): 1tuabcdefzABgvwhijklmCDxynopqrsEF

217 return None 1tuabcdefzABgvwhijklmCDxynopqrsEF

218 

219 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

220 # So it is safe to access cls.__args__ and origin.__parameters__ 

221 args: tuple[Any, ...] = cls.__args__ # type: ignore 1tuabcdefzABgvwhijklmCDxynopqrsEF

222 parameters: tuple[TypeVar, ...] = origin.__parameters__ 1tuabcdefzABgvwhijklmCDxynopqrsEF

223 return dict(zip(parameters, args)) 1tuabcdefzABgvwhijklmCDxynopqrsEF

224 

225 

226def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

227 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

228 with the `replace_types` function. 

229 

230 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

231 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

232 """ 

233 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

234 # in the __origin__, __args__, and __parameters__ attributes of the model. 

235 generic_metadata = cls.__pydantic_generic_metadata__ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

236 origin = generic_metadata['origin'] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

237 args = generic_metadata['args'] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

238 if not args: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

239 # No need to go into `iter_contained_typevars`: 

240 return {} 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

241 return dict(zip(iter_contained_typevars(origin), args)) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

242 

243 

244def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

245 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

246 

247 Args: 

248 type_: The class or generic alias. 

249 type_map: Mapping from `TypeVar` instance to concrete types. 

250 

251 Returns: 

252 A new type representing the basic structure of `type_` with all 

253 `typevar_map` keys recursively replaced. 

254 

255 Example: 

256 ```python 

257 from typing import List, Union 

258 

259 from pydantic._internal._generics import replace_types 

260 

261 replace_types(tuple[str, Union[List[str], float]], {str: int}) 

262 #> tuple[int, Union[List[int], float]] 

263 ``` 

264 """ 

265 if not type_map: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

266 return type_ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

267 

268 type_args = get_args(type_) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

269 

270 if _typing_extra.is_annotated(type_): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

271 annotated_type, *annotations = type_args 1tuabcdefzABgvwhijklmCDxynopqrsEF

272 annotated = replace_types(annotated_type, type_map) 1tuabcdefzABgvwhijklmCDxynopqrsEF

273 for annotation in annotations: 1tuabcdefzABgvwhijklmCDxynopqrsEF

274 annotated = typing.Annotated[annotated, annotation] 1tuabcdefzABgvwhijklmCDxynopqrsEF

275 return annotated 1tuabcdefzABgvwhijklmCDxynopqrsEF

276 

277 origin_type = get_origin(type_) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

278 

279 # Having type args is a good indicator that this is a typing special form 

280 # instance or a generic alias of some sort. 

281 if type_args: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

282 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

283 if all_identical(type_args, resolved_type_args): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

284 # If all arguments are the same, there is no need to modify the 

285 # type or create a new object at all 

286 return type_ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

287 

288 if ( 1tuBgvwxy

289 origin_type is not None 

290 and isinstance(type_, _typing_extra.typing_base) 

291 and not isinstance(origin_type, _typing_extra.typing_base) 

292 and getattr(type_, '_name', None) is not None 

293 ): 

294 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

295 # `type` or `collections.abc.Callable` need to be translated. 

296 # See: https://www.python.org/dev/peps/pep-0585 

297 origin_type = getattr(typing, type_._name) 1tuabcdefzABgvwhijklmCDxynopqrsEF

298 assert origin_type is not None 1tuabcdefzABgvwhijklmCDxynopqrsEF

299 

300 if _typing_extra.origin_is_union(origin_type): 1tuabcdefzABgvwhijklmCDxynopqrsEF

301 if any(_typing_extra.is_any(arg) for arg in resolved_type_args): 1tuabcdefzABgvwhijklmCDxynopqrsEF

302 # `Any | T` ~ `Any`: 

303 resolved_type_args = (Any,) 1tuabcdefzABgvwhijklmCDxynopqrsEF

304 # `Never | T` ~ `T`: 

305 resolved_type_args = tuple( 1tuabcdefzABgvwhijklmCDxynopqrsEF

306 arg 

307 for arg in resolved_type_args 

308 if not (_typing_extra.is_no_return(arg) or _typing_extra.is_never(arg)) 

309 ) 

310 

311 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

312 # We also cannot use isinstance() since we have to compare types. 

313 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 1tuabcdefzABgvwhijklmCDxynopqrsEF

314 return _UnionGenericAlias(origin_type, resolved_type_args) 1abcdefzAghijklmCDnopqrsEF

315 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

316 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 1tuabcdefzABgvwhijklmCDxynopqrsEF

317 

318 # We handle pydantic generic models separately as they don't have the same 

319 # semantics as "typing" classes or generic aliases 

320 

321 if not origin_type and is_model_class(type_): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

322 parameters = type_.__pydantic_generic_metadata__['parameters'] 1tuabcdefzABgvwhijklmCDxynopqrsEF

323 if not parameters: 1tuabcdefzABgvwhijklmCDxynopqrsEF

324 return type_ 1tuabcdefzABgvwhijklmCDxynopqrsEF

325 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 1tuabcdefzABgvwhijklmCDxynopqrsEF

326 if all_identical(parameters, resolved_type_args): 1tuabcdefzABgvwhijklmCDxynopqrsEF

327 return type_ 1tuabcdefzABgvwhijklmCDxynopqrsEF

328 return type_[resolved_type_args] 1tuabcdefzABgvwhijklmCDxynopqrsEF

329 

330 # Handle special case for typehints that can have lists as arguments. 

331 # `typing.Callable[[int, str], int]` is an example for this. 

332 if isinstance(type_, list): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

333 resolved_list = [replace_types(element, type_map) for element in type_] 1tuabcdefzABgvwhijklmCDxynopqrsEF

334 if all_identical(type_, resolved_list): 1tuabcdefzABgvwhijklmCDxynopqrsEF

335 return type_ 1tuabcdefzABgvwhijklmCDxynopqrsEF

336 return resolved_list 1tuabcdefzABgvwhijklmCDxynopqrsEF

337 

338 # If all else fails, we try to resolve the type directly and otherwise just 

339 # return the input with no modifications. 

340 return type_map.get(type_, type_) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

341 

342 

343def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

344 """Return a mapping between the arguments of a generic model and the provided arguments during parametrization. 

345 

346 Raises: 

347 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments). 

348 

349 Example: 

350 ```python {test="skip" lint="skip"} 

351 class Model[T, U, V = int](BaseModel): ... 

352 

353 map_generic_model_arguments(Model, (str, bytes)) 

354 #> {T: str, U: bytes, V: int} 

355 

356 map_generic_model_arguments(Model, (str,)) 

357 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2 

358 

359 map_generic_model_argumenst(Model, (str, bytes, int, complex)) 

360 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3 

361 ``` 

362 

363 Note: 

364 This function is analogous to the private `typing._check_generic_specialization` function. 

365 """ 

366 parameters = cls.__pydantic_generic_metadata__['parameters'] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

367 expected_len = len(parameters) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

368 typevars_map: dict[TypeVar, Any] = {} 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

369 

370 _missing = object() 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

371 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

372 if parameter is _missing: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

373 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}') 1tuabcdefzABgvwhijklmCDxynopqrsEF

374 

375 if argument is _missing: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

376 param = typing.cast(TypeVar, parameter) 1tuabcdefzABgvwhijklmCDxynopqrsEF

377 try: 1tuabcdefzABgvwhijklmCDxynopqrsEF

378 has_default = param.has_default() 1tuabcdefzABgvwhijklmCDxynopqrsEF

379 except AttributeError: 1tuabcdefBgvwhijklmxynopqrs

380 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13. 

381 has_default = False 1tuabcdefBgvwhijklmxynopqrs

382 if has_default: 1tuabcdefzABgvwhijklmCDxynopqrsEF

383 typevars_map[param] = param.__default__ 1tuabcdefzABgvwhijklmCDxynopqrsEF

384 else: 

385 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters) 1tuabcdefzABgvwhijklmCDxynopqrsEF

386 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}') 1tuabcdefzABgvwhijklmCDxynopqrsEF

387 else: 

388 param = typing.cast(TypeVar, parameter) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

389 typevars_map[param] = argument 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

390 

391 return typevars_map 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

392 

393 

394_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

395 

396 

397@contextmanager 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

398def generic_recursion_self_type( 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

399 origin: type[BaseModel], args: tuple[Any, ...] 

400) -> Iterator[PydanticRecursiveRef | None]: 

401 """This contextmanager should be placed around the recursive calls used to build a generic type, 

402 and accept as arguments the generic origin type and the type arguments being passed to it. 

403 

404 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

405 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

406 final parent schema. 

407 """ 

408 previously_seen_type_refs = _generic_recursion_cache.get() 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

409 if previously_seen_type_refs is None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

410 previously_seen_type_refs = set() 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

411 token = _generic_recursion_cache.set(previously_seen_type_refs) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

412 else: 

413 token = None 1tuabcdefzABgvwhijklmCDxynopqrsEF

414 

415 try: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

416 type_ref = get_type_ref(origin, args_override=args) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

417 if type_ref in previously_seen_type_refs: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

418 self_type = PydanticRecursiveRef(type_ref=type_ref) 1tuabcdefzABgvwhijklmCDxynopqrsEF

419 yield self_type 1tuabcdefzABgvwhijklmCDxynopqrsEF

420 else: 

421 previously_seen_type_refs.add(type_ref) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

422 yield 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

423 previously_seen_type_refs.remove(type_ref) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

424 finally: 

425 if token: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

426 _generic_recursion_cache.reset(token) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

427 

428 

429def recursively_defined_type_refs() -> set[str]: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

430 visited = _generic_recursion_cache.get() 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

431 if not visited: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

432 return set() # not in a generic recursion, so there are no types 1tuabcdefzABgvwhijklmCDxynopqrsEF

433 

434 return visited.copy() # don't allow modifications 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

435 

436 

437def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

438 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

439 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

440 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

441 

442 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

443 The approach could be modified to not use two different cache keys at different points, but the 

444 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

445 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

446 same after resolving the type arguments will always produce cache hits. 

447 

448 If we wanted to move to only using a single cache key per type, we would either need to always use the 

449 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

450 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

451 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

452 equal. 

453 """ 

454 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values)) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

455 

456 

457def get_cached_generic_type_late( 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

458 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

459) -> type[BaseModel] | None: 

460 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

461 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values)) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

462 if cached is not None: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

463 set_cached_generic_type(parent, typevar_values, cached, origin, args) 1tuabcdefzABgvwhijklmCDxynopqrsEF

464 return cached 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

465 

466 

467def set_cached_generic_type( 1tuabcdefzAvwhijklmCDMGHIJKLxynopqrsEF

468 parent: type[BaseModel], 

469 typevar_values: tuple[Any, ...], 

470 type_: type[BaseModel], 

471 origin: type[BaseModel] | None = None, 

472 args: tuple[Any, ...] | None = None, 

473) -> None: 

474 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

475 two different keys. 

476 """ 

477 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

478 if len(typevar_values) == 1: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

479 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

480 if origin and args: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

481 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_ 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

482 

483 

484def _union_orderings_key(typevar_values: Any) -> Any: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

485 """This is intended to help differentiate between Union types with the same arguments in different order. 

486 

487 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

488 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

489 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

490 

491 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

492 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

493 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

494 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

495 (See https://github.com/python/cpython/issues/86483 for reference.) 

496 """ 

497 if isinstance(typevar_values, tuple): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

498 args_data = [] 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

499 for value in typevar_values: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

500 args_data.append(_union_orderings_key(value)) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

501 return tuple(args_data) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

502 elif _typing_extra.is_union(typevar_values): 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

503 return get_args(typevar_values) 1tuabcdefzABgvwhijklmCDxynopqrsEF

504 else: 

505 return () 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

506 

507 

508def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

509 """This is intended for minimal computational overhead during lookups of cached types. 

510 

511 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

512 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

513 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

514 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

515 would result in the same type. 

516 """ 

517 return cls, typevar_values, _union_orderings_key(typevar_values) 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

518 

519 

520def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF

521 """This is intended for use later in the process of creating a new type, when we have more information 

522 about the exact args that will be passed. If it turns out that a different set of inputs to 

523 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

524 return the cached type, and update the cache with the _early_cache_key as well. 

525 """ 

526 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

527 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

528 # whereas this function will always produce a tuple as the first item in the key. 

529 return _union_orderings_key(typevar_values), origin, args 1tuabcdefzABgvwhijklmCDMGHIJKLxynopqrsEF