Coverage for pydantic/_internal/_generics.py: 93.56%

232 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-22 09:30 +0000

1from __future__ import annotations 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

2 

3import operator 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

4import sys 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

5import types 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

6import typing 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

7from collections import ChainMap 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

8from collections.abc import Iterator, Mapping 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

9from contextlib import contextmanager 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

10from contextvars import ContextVar 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

11from functools import reduce 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

12from itertools import zip_longest 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

13from types import prepare_class 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

14from typing import TYPE_CHECKING, Annotated, Any, TypeVar 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

15from weakref import WeakValueDictionary 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

16 

17import typing_extensions 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

18from typing_inspection import typing_objects 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

19from typing_inspection.introspection import is_union_origin 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

20 

21from . import _typing_extra 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

22from ._core_utils import get_type_ref 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

23from ._forward_ref import PydanticRecursiveRef 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

24from ._utils import all_identical, is_model_class 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

25 

26if TYPE_CHECKING: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

27 from ..main import BaseModel 

28 

29GenericTypesCacheKey = tuple[Any, Any, tuple[Any, ...]] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

30 

31# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

32# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

33# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

34# while also retaining a limited number of types even without references. This is generally enough to build 

35# specific recursive generic models without losing required items out of the cache. 

36 

37KT = TypeVar('KT') 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

38VT = TypeVar('VT') 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

39_LIMITED_DICT_SIZE = 100 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

40 

41 

42class LimitedDict(dict[KT, VT]): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

43 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE) -> None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

44 self.size_limit = size_limit 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

45 super().__init__() 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

46 

47 def __setitem__(self, key: KT, value: VT, /) -> None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

48 super().__setitem__(key, value) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

49 if len(self) > self.size_limit: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

50 excess = len(self) - self.size_limit + self.size_limit // 10 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

51 to_remove = list(self.keys())[:excess] 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

52 for k in to_remove: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

53 del self[k] 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

54 

55 

56# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

57# once they are no longer referenced by the caller. 

58GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

59 

60if TYPE_CHECKING: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

61 

62 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

63 ... 

64 

65else: 

66 

67 class DeepChainMap(ChainMap): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

68 """Variant of ChainMap that allows direct updates to inner scopes. 

69 

70 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

71 with some light modifications for this use case. 

72 """ 

73 

74 def clear(self) -> None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

75 for mapping in self.maps: 

76 mapping.clear() 

77 

78 def __setitem__(self, key: KT, value: VT) -> None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

79 for mapping in self.maps: 

80 mapping[key] = value 

81 

82 def __delitem__(self, key: KT) -> None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

83 hit = False 

84 for mapping in self.maps: 

85 if key in mapping: 

86 del mapping[key] 

87 hit = True 

88 if not hit: 

89 raise KeyError(key) 

90 

91 

92# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

93# and discover later on that we need to re-add all this infrastructure... 

94# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

95 

96_GENERIC_TYPES_CACHE: ContextVar[GenericTypesCache | None] = ContextVar('_GENERIC_TYPES_CACHE', default=None) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

97 

98 

99class PydanticGenericMetadata(typing_extensions.TypedDict): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

100 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

101 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

102 parameters: tuple[TypeVar, ...] # analogous to typing.Generic.__parameters__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

103 

104 

105def create_generic_submodel( 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

106 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

107) -> type[BaseModel]: 

108 """Dynamically create a submodel of a provided (generic) BaseModel. 

109 

110 This is used when producing concrete parametrizations of generic models. This function 

111 only *creates* the new subclass; the schema/validators/serialization must be updated to 

112 reflect a concrete parametrization elsewhere. 

113 

114 Args: 

115 model_name: The name of the newly created model. 

116 origin: The base class for the new model to inherit from. 

117 args: A tuple of generic metadata arguments. 

118 params: A tuple of generic metadata parameters. 

119 

120 Returns: 

121 The created submodel. 

122 """ 

123 namespace: dict[str, Any] = {'__module__': origin.__module__} 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

124 bases = (origin,) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

125 meta, ns, kwds = prepare_class(model_name, bases) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

126 namespace.update(ns) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

127 created_model = meta( 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

128 model_name, 

129 bases, 

130 namespace, 

131 __pydantic_generic_metadata__={ 

132 'origin': origin, 

133 'args': args, 

134 'parameters': params, 

135 }, 

136 __pydantic_reset_parent_namespace__=False, 

137 **kwds, 

138 ) 

139 

140 model_module, called_globally = _get_caller_frame_info(depth=3) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

141 if called_globally: # create global reference and therefore allow pickling 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

142 object_by_reference = None 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

143 reference_name = model_name 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

144 reference_module_globals = sys.modules[created_model.__module__].__dict__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

145 while object_by_reference is not created_model: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

146 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

147 reference_name += '_' 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

148 

149 return created_model 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

150 

151 

152def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

153 """Used inside a function to check whether it was called globally. 

154 

155 Args: 

156 depth: The depth to get the frame. 

157 

158 Returns: 

159 A tuple contains `module_name` and `called_globally`. 

160 

161 Raises: 

162 RuntimeError: If the function is not called inside a function. 

163 """ 

164 try: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

165 previous_caller_frame = sys._getframe(depth) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

166 except ValueError as e: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

167 raise RuntimeError('This function must be used inside another function') from e 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

168 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

169 return None, False 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

170 frame_globals = previous_caller_frame.f_globals 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

171 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

172 

173 

174DictValues: type[Any] = {}.values().__class__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

175 

176 

177def iter_contained_typevars(v: Any) -> Iterator[TypeVar]: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

178 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

179 

180 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

181 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

182 """ 

183 if isinstance(v, TypeVar): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

184 yield v 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

185 elif is_model_class(v): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

186 yield from v.__pydantic_generic_metadata__['parameters'] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

187 elif isinstance(v, (DictValues, list)): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

188 for var in v: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

189 yield from iter_contained_typevars(var) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

190 else: 

191 args = get_args(v) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

192 for arg in args: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

193 yield from iter_contained_typevars(arg) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

194 

195 

196def get_args(v: Any) -> Any: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

197 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

198 if pydantic_generic_metadata: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

199 return pydantic_generic_metadata.get('args') 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

200 return typing_extensions.get_args(v) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

201 

202 

203def get_origin(v: Any) -> Any: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

204 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

205 if pydantic_generic_metadata: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

206 return pydantic_generic_metadata.get('origin') 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

207 return typing_extensions.get_origin(v) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

208 

209 

210def get_standard_typevars_map(cls: Any) -> dict[TypeVar, Any] | None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

211 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

212 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

213 """ 

214 origin = get_origin(cls) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

215 if origin is None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

216 return None 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

217 if not hasattr(origin, '__parameters__'): 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

218 return None 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

219 

220 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

221 # So it is safe to access cls.__args__ and origin.__parameters__ 

222 args: tuple[Any, ...] = cls.__args__ # type: ignore 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

223 parameters: tuple[TypeVar, ...] = origin.__parameters__ 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

224 return dict(zip(parameters, args)) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

225 

226 

227def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVar, Any]: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

228 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

229 with the `replace_types` function. 

230 

231 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

232 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

233 """ 

234 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

235 # in the __origin__, __args__, and __parameters__ attributes of the model. 

236 generic_metadata = cls.__pydantic_generic_metadata__ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

237 origin = generic_metadata['origin'] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

238 args = generic_metadata['args'] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

239 if not args: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

240 # No need to go into `iter_contained_typevars`: 

241 return {} 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

242 return dict(zip(iter_contained_typevars(origin), args)) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

243 

244 

245def replace_types(type_: Any, type_map: Mapping[TypeVar, Any] | None) -> Any: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

246 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

247 

248 Args: 

249 type_: The class or generic alias. 

250 type_map: Mapping from `TypeVar` instance to concrete types. 

251 

252 Returns: 

253 A new type representing the basic structure of `type_` with all 

254 `typevar_map` keys recursively replaced. 

255 

256 Example: 

257 ```python 

258 from typing import List, Union 

259 

260 from pydantic._internal._generics import replace_types 

261 

262 replace_types(tuple[str, Union[List[str], float]], {str: int}) 

263 #> tuple[int, Union[List[int], float]] 

264 ``` 

265 """ 

266 if not type_map: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

267 return type_ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

268 

269 type_args = get_args(type_) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

270 origin_type = get_origin(type_) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

271 

272 if typing_objects.is_annotated(origin_type): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

273 annotated_type, *annotations = type_args 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

274 annotated_type = replace_types(annotated_type, type_map) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

275 # TODO remove parentheses when we drop support for Python 3.10: 

276 return Annotated[(annotated_type, *annotations)] 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

277 

278 # Having type args is a good indicator that this is a typing special form 

279 # instance or a generic alias of some sort. 

280 if type_args: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

281 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

282 if all_identical(type_args, resolved_type_args): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

283 # If all arguments are the same, there is no need to modify the 

284 # type or create a new object at all 

285 return type_ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

286 

287 if ( 1abdcklst

288 origin_type is not None 

289 and isinstance(type_, _typing_extra.typing_base) 

290 and not isinstance(origin_type, _typing_extra.typing_base) 

291 and getattr(type_, '_name', None) is not None 

292 ): 

293 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

294 # `type` or `collections.abc.Callable` need to be translated. 

295 # See: https://www.python.org/dev/peps/pep-0585 

296 origin_type = getattr(typing, type_._name) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

297 assert origin_type is not None 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

298 

299 if is_union_origin(origin_type): 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

300 if any(typing_objects.is_any(arg) for arg in resolved_type_args): 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

301 # `Any | T` ~ `Any`: 

302 resolved_type_args = (Any,) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

303 # `Never | T` ~ `T`: 

304 resolved_type_args = tuple( 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

305 arg 

306 for arg in resolved_type_args 

307 if not (typing_objects.is_noreturn(arg) or typing_objects.is_never(arg)) 

308 ) 

309 

310 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

311 # We also cannot use isinstance() since we have to compare types. 

312 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

313 return reduce(operator.or_, resolved_type_args) 1efghijABCDMcmnopqrEFGHNuvwxyzIJKLO

314 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

315 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

316 

317 # We handle pydantic generic models separately as they don't have the same 

318 # semantics as "typing" classes or generic aliases 

319 

320 if not origin_type and is_model_class(type_): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

321 parameters = type_.__pydantic_generic_metadata__['parameters'] 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

322 if not parameters: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

323 return type_ 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

324 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

325 if all_identical(parameters, resolved_type_args): 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

326 return type_ 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

327 return type_[resolved_type_args] 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

328 

329 # Handle special case for typehints that can have lists as arguments. 

330 # `typing.Callable[[int, str], int]` is an example for this. 

331 if isinstance(type_, list): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

332 resolved_list = [replace_types(element, type_map) for element in type_] 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

333 if all_identical(type_, resolved_list): 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

334 return type_ 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

335 return resolved_list 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

336 

337 # If all else fails, we try to resolve the type directly and otherwise just 

338 # return the input with no modifications. 

339 return type_map.get(type_, type_) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

340 

341 

342def map_generic_model_arguments(cls: type[BaseModel], args: tuple[Any, ...]) -> dict[TypeVar, Any]: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

343 """Return a mapping between the parameters of a generic model and the provided arguments during parameterization. 

344 

345 Raises: 

346 TypeError: If the number of arguments does not match the parameters (i.e. if providing too few or too many arguments). 

347 

348 Example: 

349 ```python {test="skip" lint="skip"} 

350 class Model[T, U, V = int](BaseModel): ... 

351 

352 map_generic_model_arguments(Model, (str, bytes)) 

353 #> {T: str, U: bytes, V: int} 

354 

355 map_generic_model_arguments(Model, (str,)) 

356 #> TypeError: Too few arguments for <class '__main__.Model'>; actual 1, expected at least 2 

357 

358 map_generic_model_arguments(Model, (str, bytes, int, complex)) 

359 #> TypeError: Too many arguments for <class '__main__.Model'>; actual 4, expected 3 

360 ``` 

361 

362 Note: 

363 This function is analogous to the private `typing._check_generic_specialization` function. 

364 """ 

365 parameters = cls.__pydantic_generic_metadata__['parameters'] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

366 expected_len = len(parameters) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

367 typevars_map: dict[TypeVar, Any] = {} 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

368 

369 _missing = object() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

370 for parameter, argument in zip_longest(parameters, args, fillvalue=_missing): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

371 if parameter is _missing: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

372 raise TypeError(f'Too many arguments for {cls}; actual {len(args)}, expected {expected_len}') 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

373 

374 if argument is _missing: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

375 param = typing.cast(TypeVar, parameter) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

376 try: 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

377 has_default = param.has_default() 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

378 except AttributeError: 1abefghijdcklmnopqrstuvwxyz

379 # Happens if using `typing.TypeVar` (and not `typing_extensions`) on Python < 3.13. 

380 has_default = False 1abefghijdcklmnopqrstuvwxyz

381 if has_default: 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

382 # The default might refer to other type parameters. For an example, see: 

383 # https://typing.readthedocs.io/en/latest/spec/generics.html#type-parameters-as-parameters-to-generics 

384 typevars_map[param] = replace_types(param.__default__, typevars_map) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

385 else: 

386 expected_len -= sum(hasattr(p, 'has_default') and p.has_default() for p in parameters) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

387 raise TypeError(f'Too few arguments for {cls}; actual {len(args)}, expected at least {expected_len}') 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

388 else: 

389 param = typing.cast(TypeVar, parameter) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

390 typevars_map[param] = argument 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

391 

392 return typevars_map 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

393 

394 

395_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

396 

397 

398@contextmanager 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

399def generic_recursion_self_type( 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

400 origin: type[BaseModel], args: tuple[Any, ...] 

401) -> Iterator[PydanticRecursiveRef | None]: 

402 """This contextmanager should be placed around the recursive calls used to build a generic type, 

403 and accept as arguments the generic origin type and the type arguments being passed to it. 

404 

405 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

406 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

407 final parent schema. 

408 """ 

409 previously_seen_type_refs = _generic_recursion_cache.get() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

410 if previously_seen_type_refs is None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

411 previously_seen_type_refs = set() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

412 token = _generic_recursion_cache.set(previously_seen_type_refs) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

413 else: 

414 token = None 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

415 

416 try: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

417 type_ref = get_type_ref(origin, args_override=args) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

418 if type_ref in previously_seen_type_refs: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

419 self_type = PydanticRecursiveRef(type_ref=type_ref) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

420 yield self_type 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

421 else: 

422 previously_seen_type_refs.add(type_ref) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

423 yield 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

424 previously_seen_type_refs.remove(type_ref) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

425 finally: 

426 if token: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

427 _generic_recursion_cache.reset(token) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

428 

429 

430def recursively_defined_type_refs() -> set[str]: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

431 visited = _generic_recursion_cache.get() 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

432 if not visited: 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

433 return set() # not in a generic recursion, so there are no types 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

434 

435 return visited.copy() # don't allow modifications 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

436 

437 

438def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

439 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

440 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

441 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

442 

443 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

444 The approach could be modified to not use two different cache keys at different points, but the 

445 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

446 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

447 same after resolving the type arguments will always produce cache hits. 

448 

449 If we wanted to move to only using a single cache key per type, we would either need to always use the 

450 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

451 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

452 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

453 equal. 

454 """ 

455 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

456 if generic_types_cache is None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

457 generic_types_cache = GenericTypesCache() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

458 _GENERIC_TYPES_CACHE.set(generic_types_cache) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

459 return generic_types_cache.get(_early_cache_key(parent, typevar_values)) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

460 

461 

462def get_cached_generic_type_late( 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

463 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

464) -> type[BaseModel] | None: 

465 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

466 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

467 if ( 1abdc

468 generic_types_cache is None 

469 ): # pragma: no cover (early cache is guaranteed to run first and initialize the cache) 

470 generic_types_cache = GenericTypesCache() 

471 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

472 cached = generic_types_cache.get(_late_cache_key(origin, args, typevar_values)) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

473 if cached is not None: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

474 set_cached_generic_type(parent, typevar_values, cached, origin, args) 1abefghijABCDdcklmnopqrEFGHstuvwxyzIJKL

475 return cached 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

476 

477 

478def set_cached_generic_type( 1abefghijABCDMklmnopqrEFGHNPstuvwxyzIJKLO

479 parent: type[BaseModel], 

480 typevar_values: tuple[Any, ...], 

481 type_: type[BaseModel], 

482 origin: type[BaseModel] | None = None, 

483 args: tuple[Any, ...] | None = None, 

484) -> None: 

485 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

486 two different keys. 

487 """ 

488 generic_types_cache = _GENERIC_TYPES_CACHE.get() 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

489 if ( 1abdc

490 generic_types_cache is None 

491 ): # pragma: no cover (cache lookup is guaranteed to run first and initialize the cache) 

492 generic_types_cache = GenericTypesCache() 

493 _GENERIC_TYPES_CACHE.set(generic_types_cache) 

494 generic_types_cache[_early_cache_key(parent, typevar_values)] = type_ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

495 if len(typevar_values) == 1: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

496 generic_types_cache[_early_cache_key(parent, typevar_values[0])] = type_ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

497 if origin and args: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

498 generic_types_cache[_late_cache_key(origin, args, typevar_values)] = type_ 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

499 

500 

501def _union_orderings_key(typevar_values: Any) -> Any: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

502 """This is intended to help differentiate between Union types with the same arguments in different order. 

503 

504 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

505 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

506 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

507 

508 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

509 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

510 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

511 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

512 (See https://github.com/python/cpython/issues/86483 for reference.) 

513 """ 

514 if isinstance(typevar_values, tuple): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

515 args_data = [] 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

516 for value in typevar_values: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

517 args_data.append(_union_orderings_key(value)) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

518 return tuple(args_data) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

519 elif typing_objects.is_union(typing_extensions.get_origin(typevar_values)): 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

520 return get_args(typevar_values) 1abefghijABCDMdcklmnopqrEFGHNstuvwxyzIJKLO

521 else: 

522 return () 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

523 

524 

525def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

526 """This is intended for minimal computational overhead during lookups of cached types. 

527 

528 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

529 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

530 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

531 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

532 would result in the same type. 

533 """ 

534 return cls, typevar_values, _union_orderings_key(typevar_values) 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

535 

536 

537def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO

538 """This is intended for use later in the process of creating a new type, when we have more information 

539 about the exact args that will be passed. If it turns out that a different set of inputs to 

540 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

541 return the cached type, and update the cache with the _early_cache_key as well. 

542 """ 

543 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

544 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

545 # whereas this function will always produce a tuple as the first item in the key. 

546 return _union_orderings_key(typevar_values), origin, args 1abefghijABCDMdcklmnopqrEFGHNPstuvwxyzIJKLO