Coverage for pydantic/_internal/_generics.py: 93.56%

232 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-26 07:45 +0000

1from __future__ import annotations 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

2 

3import sys 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

4import types 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

5import typing 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

6from collections import ChainMap 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

7from collections.abc import Iterator, Mapping 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

8from contextlib import contextmanager 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

9from contextvars import ContextVar 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

10from itertools import zip_longest 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

11from types import prepare_class 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

12from typing import TYPE_CHECKING, Annotated, Any, TypeVar 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

13from weakref import WeakValueDictionary 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

14 

15import typing_extensions 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

16from typing_inspection import typing_objects 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

17from typing_inspection.introspection import is_union_origin 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

18 

19from . import _typing_extra 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

20from ._core_utils import get_type_ref 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

21from ._forward_ref import PydanticRecursiveRef 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

22from ._utils import all_identical, is_model_class 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

23 

24if sys.version_info >= (3, 10): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

25 from typing import _UnionGenericAlias # type: ignore[attr-defined] 1defghiABCaklmnopDEFKLMNOPqrstuvGHI

26 

27if TYPE_CHECKING: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

28 from ..main import BaseModel 

29 

30GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

31 

32# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

33# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

34# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

35# while also retaining a limited number of types even without references. This is generally enough to build 

36# specific recursive generic models without losing required items out of the cache. 

37 

38KT = TypeVar('KT') 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

39VT = TypeVar('VT') 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

40_LIMITED_DICT_SIZE = 100 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

41 

42 

43class LimitedDict(dict[KT, VT]): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

44 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

45 self.size_limit = size_limit 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

46 super().__init__() 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

47 

48 def __setitem__(self, key: KT, value: VT, /) -> None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

49 super().__setitem__(key, value) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

50 if len(self) > self.size_limit: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

51 excess = len(self) - self.size_limit + self.size_limit // 10 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

52 to_remove = list(self.keys())[:excess] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

53 for k in to_remove: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

54 del self[k] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

55 

56 

57# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

58# once they are no longer referenced by the caller. 

59GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

60 

61if TYPE_CHECKING: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

62 

63 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

64 ... 

65 

66else: 

67 

68 class DeepChainMap(ChainMap): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

69 """Variant of ChainMap that allows direct updates to inner scopes. 

70 

71 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

72 with some light modifications for this use case. 

73 """ 

74 

75 def clear(self) -> None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

76 for mapping in self.maps: 

77 mapping.clear() 

78 

79 def __setitem__(self, key: KT, value: VT) -> None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

80 for mapping in self.maps: 

81 mapping[key] = value 

82 

83 def __delitem__(self, key: KT) -> None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

84 hit = False 

85 for mapping in self.maps: 

86 if key in mapping: 

87 del mapping[key] 

88 hit = True 

89 if not hit: 

90 raise KeyError(key) 

91 

92 

93# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

94# and discover later on that we need to re-add all this infrastructure... 

95# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

96 

97_GENERIC_TYPES_CACHE: ContextVar[GenericTypesCache | None] = ContextVar('_GENERIC_TYPES_CACHE', default=None) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

98 

99 

100class PydanticGenericMetadata(typing_extensions.TypedDict): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

101 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

102 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

103 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

104 

105 

106def create_generic_submodel( 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

107 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

108) -> type[BaseModel]: 

109 """Dynamically create a submodel of a provided (generic) BaseModel. 

110 

111 This is used when producing concrete parametrizations of generic models. This function 

112 only *creates* the new subclass; the schema/validators/serialization must be updated to 

113 reflect a concrete parametrization elsewhere. 

114 

115 Args: 

116 model_name: The name of the newly created model. 

117 origin: The base class for the new model to inherit from. 

118 args: A tuple of generic metadata arguments. 

119 params: A tuple of generic metadata parameters. 

120 

121 Returns: 

122 The created submodel. 

123 """ 

124 namespace: dict[str, Any] = {'__module__': origin.__module__} 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

125 bases = (origin,) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

126 meta, ns, kwds = prepare_class(model_name, bases) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

127 namespace.update(ns) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

128 created_model = meta( 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

129 model_name, 

130 bases, 

131 namespace, 

132 __pydantic_generic_metadata__={ 

133 'origin': origin, 

134 'args': args, 

135 'parameters': params, 

136 }, 

137 __pydantic_reset_parent_namespace__=False, 

138 **kwds, 

139 ) 

140 

141 model_module, called_globally = _get_caller_frame_info(depth=3) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

142 if called_globally: # create global reference and therefore allow pickling 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

143 object_by_reference = None 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

144 reference_name = model_name 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

145 reference_module_globals = sys.modules[created_model.__module__].__dict__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

146 while object_by_reference is not created_model: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

147 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

148 reference_name += '_' 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

149 

150 return created_model 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

151 

152 

153def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

154 """Used inside a function to check whether it was called globally. 

155 

156 Args: 

157 depth: The depth to get the frame. 

158 

159 Returns: 

160 A tuple contains `module_name` and `called_globally`. 

161 

162 Raises: 

163 RuntimeError: If the function is not called inside a function. 

164 """ 

165 try: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

166 previous_caller_frame = sys._getframe(depth) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

167 except ValueError as e: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

168 raise RuntimeError('This function must be used inside another function') from e 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

169 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

170 return None, False 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

171 frame_globals = previous_caller_frame.f_globals 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

172 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

173 

174 

175DictValues: type[Any] = {}.values().__class__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

176 

177 

178def iter_contained_typevars(v: Any) -> Iterator[TypeVar]: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

179 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

180 

181 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

182 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

183 """ 

184 if isinstance(v, TypeVar): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

185 yield v 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

186 elif is_model_class(v): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

187 yield from v.__pydantic_generic_metadata__['parameters'] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

188 elif isinstance(v, (DictValues, list)): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

189 for var in v: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

190 yield from iter_contained_typevars(var) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

191 else: 

192 args = get_args(v) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

193 for arg in args: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

194 yield from iter_contained_typevars(arg) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

195 

196 

197def get_args(v: Any) -> Any: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

198 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

199 if pydantic_generic_metadata: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

200 return pydantic_generic_metadata.get('args') 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

201 return typing_extensions.get_args(v) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

202 

203 

204def get_origin(v: Any) -> Any: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

205 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

206 if pydantic_generic_metadata: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

207 return pydantic_generic_metadata.get('origin') 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

208 return typing_extensions.get_origin(v) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

209 

210 

211def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

212 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

213 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

214 """ 

215 origin = get_origin(cls) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

216 if origin is None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

217 return None 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

218 if not hasattr(origin, '__parameters__'): 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

219 return None 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

220 

221 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

222 # So it is safe to access cls.__args__ and origin.__parameters__ 

223 args: tuple[Any, ...] = cls.__args__ # type: ignore 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

224 parameters: tuple[TypeVar, ...] = origin.__parameters__ 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

225 return dict(zip(parameters, args)) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

226 

227 

228def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

229 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

230 with the `replace_types` function. 

231 

232 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

233 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

234 """ 

235 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

236 # in the __origin__, __args__, and __parameters__ attributes of the model. 

237 generic_metadata = cls.__pydantic_generic_metadata__ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

238 origin = generic_metadata['origin'] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

239 args = generic_metadata['args'] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

240 if not args: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

241 # No need to go into `iter_contained_typevars`: 

242 return {} 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

243 return dict(zip(iter_contained_typevars(origin), args)) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

244 

245 

246def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

247 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

248 

249 Args: 

250 type_: The class or generic alias. 

251 type_map: Mapping from `TypeVar` instance to concrete types. 

252 

253 Returns: 

254 A new type representing the basic structure of `type_` with all 

255 `typevar_map` keys recursively replaced. 

256 

257 Example: 

258 ```python 

259 from typing import List, Union 

260 

261 from pydantic._internal._generics import replace_types 

262 

263 replace_types(tuple[str, Union[List[str], float]], {str: int}) 

264 #> tuple[int, Union[List[int], float]] 

265 ``` 

266 """ 

267 if not type_map: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

268 return type_ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

269 

270 type_args = get_args(type_) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

271 origin_type = get_origin(type_) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

272 

273 if typing_objects.is_annotated(origin_type): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

274 annotated_type, *annotations = type_args 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

275 annotated_type = replace_types(annotated_type, type_map) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

276 # TODO remove parentheses when we drop support for Python 3.10: 

277 return Annotated[(annotated_type, *annotations)] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

278 

279 # Having type args is a good indicator that this is a typing special form 

280 # instance or a generic alias of some sort. 

281 if type_args: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

282 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

283 if all_identical(type_args, resolved_type_args): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

284 # If all arguments are the same, there is no need to modify the 

285 # type or create a new object at all 

286 return type_ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

287 

288 if ( 1bcjawxyz

289 origin_type is not None 

290 and isinstance(type_, _typing_extra.typing_base) 

291 and not isinstance(origin_type, _typing_extra.typing_base) 

292 and getattr(type_, '_name', None) is not None 

293 ): 

294 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

295 # `type` or `collections.abc.Callable` need to be translated. 

296 # See: https://www.python.org/dev/peps/pep-0585 

297 origin_type = getattr(typing, type_._name) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

298 assert origin_type is not None 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

299 

300 if is_union_origin(origin_type): 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

301 if any(typing_objects.is_any(arg) for arg in resolved_type_args): 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

302 # `Any | T` ~ `Any`: 

303 resolved_type_args = (Any,) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

304 # `Never | T` ~ `T`: 

305 resolved_type_args = tuple( 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

306 arg 

307 for arg in resolved_type_args 

308 if not (typing_objects.is_noreturn(arg) or typing_objects.is_never(arg)) 

309 ) 

310 

311 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

312 # We also cannot use isinstance() since we have to compare types. 

313 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

314 return _UnionGenericAlias(origin_type, resolved_type_args) 1defghiABCaklmnopDEFqrstuvGHI

315 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

316 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

317 

318 # We handle pydantic generic models separately as they don't have the same 

319 # semantics as "typing" classes or generic aliases 

320 

321 if not origin_type and is_model_class(type_): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

322 parameters = type_.__pydantic_generic_metadata__['parameters'] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

323 if not parameters: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

324 return type_ 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

325 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

326 if all_identical(parameters, resolved_type_args): 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

327 return type_ 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

328 return type_[resolved_type_args] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

329 

330 # Handle special case for typehints that can have lists as arguments. 

331 # `typing.Callable[[int, str], int]` is an example for this. 

332 if isinstance(type_, list): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

333 resolved_list = [replace_types(element, type_map) for element in type_] 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

334 if all_identical(type_, resolved_list): 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

335 return type_ 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

336 return resolved_list 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

337 

338 # If all else fails, we try to resolve the type directly and otherwise just 

339 # return the input with no modifications. 

340 return type_map.get(type_, type_) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

341 

342 

343def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

344 """Return a mapping between the parameters of a generic model and the provided arguments during parameterization. 

345 

346 Raises: 

347 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments). 

348 

349 Example: 

350 ```python {test="skip" lint="skip"} 

351 class Model[T, U, V = int](BaseModel): ... 

352 

353 map_generic_model_arguments(Model, (str, bytes)) 

354 #> {T: str, U: bytes, V: int} 

355 

356 map_generic_model_arguments(Model, (str,)) 

357 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2 

358 

359 map_generic_model_arguments(Model, (str, bytes, int, complex)) 

360 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3 

361 ``` 

362 

363 Note: 

364 This function is analogous to the private `typing._check_generic_specialization` function. 

365 """ 

366 parameters = cls.__pydantic_generic_metadata__['parameters'] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

367 expected_len = len(parameters) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

368 typevars_map: dict[TypeVar, Any] = {} 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

369 

370 _missing = object() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

371 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

372 if parameter is _missing: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

373 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}') 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

374 

375 if argument is _missing: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

376 param = typing.cast(TypeVar, parameter) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

377 try: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

378 has_default = param.has_default() 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

379 except AttributeError: 1bcdefghijawxklmnopyzqrstuv

380 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13. 

381 has_default = False 1bcdefghijawxklmnopyzqrstuv

382 if has_default: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

383 # The default might refer to other type parameters. For an example, see: 

384 # https://typing.readthedocs.io/en/latest/spec/generics.html#type-parameters-as-parameters-to-generics 

385 typevars_map[param] = replace_types(param.__default__, typevars_map) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

386 else: 

387 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

388 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}') 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

389 else: 

390 param = typing.cast(TypeVar, parameter) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

391 typevars_map[param] = argument 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

392 

393 return typevars_map 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

394 

395 

396_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

397 

398 

399@contextmanager 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

400def generic_recursion_self_type( 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

401 origin: type[BaseModel], args: tuple[Any, ...] 

402) -> Iterator[PydanticRecursiveRef | None]: 

403 """This contextmanager should be placed around the recursive calls used to build a generic type, 

404 and accept as arguments the generic origin type and the type arguments being passed to it. 

405 

406 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

407 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

408 final parent schema. 

409 """ 

410 previously_seen_type_refs = _generic_recursion_cache.get() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

411 if previously_seen_type_refs is None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

412 previously_seen_type_refs = set() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

413 token = _generic_recursion_cache.set(previously_seen_type_refs) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

414 else: 

415 token = None 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

416 

417 try: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

418 type_ref = get_type_ref(origin, args_override=args) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

419 if type_ref in previously_seen_type_refs: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

420 self_type = PydanticRecursiveRef(type_ref=type_ref) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

421 yield self_type 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

422 else: 

423 previously_seen_type_refs.add(type_ref) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

424 yield 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

425 previously_seen_type_refs.remove(type_ref) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

426 finally: 

427 if token: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

428 _generic_recursion_cache.reset(token) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

429 

430 

431def recursively_defined_type_refs() -> set[str]: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

432 visited = _generic_recursion_cache.get() 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

433 if not visited: 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

434 return set() # not in a generic recursion, so there are no types 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

435 

436 return visited.copy() # don't allow modifications 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

437 

438 

439def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

440 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

441 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

442 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

443 

444 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

445 The approach could be modified to not use two different cache keys at different points, but the 

446 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

447 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

448 same after resolving the type arguments will always produce cache hits. 

449 

450 If we wanted to move to only using a single cache key per type, we would either need to always use the 

451 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

452 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

453 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

454 equal. 

455 """ 

456 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

457 if generic_types_cache is None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

458 generic_types_cache = GenericTypesCache() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

459 _GENERIC_TYPES_CACHE.set(generic_types_cache) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

460 return generic_types_cache.get(_early_cache_key(parent, typevar_values)) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

461 

462 

463def get_cached_generic_type_late( 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

464 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

465) -> type[BaseModel] | None: 

466 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

467 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

468 if ( 1bcjaJ

469 generic_types_cache is None 

470 ): # pragma: no cover (early cache is guaranteed to run first and initialize the cache) 

471 generic_types_cache = GenericTypesCache() 

472 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

473 cached = generic_types_cache.get(_late_cache_key(origin, args, typevar_values)) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

474 if cached is not None: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

475 set_cached_generic_type(parent, typevar_values, cached, origin, args) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

476 return cached 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

477 

478 

479def set_cached_generic_type( 1bcdefghiABCwxklmnopDEFJKLMNOPyzqrstuvGHI

480 parent: type[BaseModel], 

481 typevar_values: tuple[Any, ...], 

482 type_: type[BaseModel], 

483 origin: type[BaseModel] | None = None, 

484 args: tuple[Any, ...] | None = None, 

485) -> None: 

486 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

487 two different keys. 

488 """ 

489 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

490 if ( 1bcjaJ

491 generic_types_cache is None 

492 ): # pragma: no cover (cache lookup is guaranteed to run first and initialize the cache) 

493 generic_types_cache = GenericTypesCache() 

494 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

495 generic_types_cache[_early_cache_key(parent, typevar_values)] = type_ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

496 if len(typevar_values) == 1: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

497 generic_types_cache[_early_cache_key(parent, typevar_values[0])] = type_ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

498 if origin and args: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

499 generic_types_cache[_late_cache_key(origin, args, typevar_values)] = type_ 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

500 

501 

502def _union_orderings_key(typevar_values: Any) -> Any: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

503 """This is intended to help differentiate between Union types with the same arguments in different order. 

504 

505 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

506 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

507 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

508 

509 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

510 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

511 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

512 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

513 (See https://github.com/python/cpython/issues/86483 for reference.) 

514 """ 

515 if isinstance(typevar_values, tuple): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

516 args_data = [] 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

517 for value in typevar_values: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

518 args_data.append(_union_orderings_key(value)) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

519 return tuple(args_data) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

520 elif typing_objects.is_union(typing_extensions.get_origin(typevar_values)): 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

521 return get_args(typevar_values) 1bcdefghiABCjawxklmnopDEFyzqrstuvGHI

522 else: 

523 return () 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

524 

525 

526def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

527 """This is intended for minimal computational overhead during lookups of cached types. 

528 

529 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

530 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

531 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

532 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

533 would result in the same type. 

534 """ 

535 return cls, typevar_values, _union_orderings_key(typevar_values) 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

536 

537 

538def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI

539 """This is intended for use later in the process of creating a new type, when we have more information 

540 about the exact args that will be passed. If it turns out that a different set of inputs to 

541 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

542 return the cached type, and update the cache with the _early_cache_key as well. 

543 """ 

544 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

545 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

546 # whereas this function will always produce a tuple as the first item in the key. 

547 return _union_orderings_key(typevar_values), origin, args 1bcdefghiABCjawxklmnopDEFJKLMNOPyzqrstuvGHI