Coverage for pydantic/_internal/_generics.py: 93.52%

218 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-03 19:29 +0000

1from __future__ import annotations 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

2 

3import sys 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

4import types 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

5import typing 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

6from collections import ChainMap 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

7from contextlib import contextmanager 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

8from contextvars import ContextVar 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

9from types import prepare_class 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

10from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, MutableMapping, Tuple, TypeVar 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

11from weakref import WeakValueDictionary 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

12 

13import typing_extensions 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

14 

15from ._core_utils import get_type_ref 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

16from ._forward_ref import PydanticRecursiveRef 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

17from ._typing_extra import TypeVarType, typing_base 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

18from ._utils import all_identical, is_model_class 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

19 

20if sys.version_info >= (3, 10): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

21 from typing import _UnionGenericAlias # type: ignore[attr-defined] 1abcdefghijklmnopqMNOPQRSTrstuvwxy

22 

23if TYPE_CHECKING: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

24 from ..main import BaseModel 

25 

26GenericTypesCacheKey = Tuple[Any, Any, Tuple[Any, ...]] 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

27 

28# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching. 

29# Right now, to handle recursive generics, we some types must remain cached for brief periods without references. 

30# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references, 

31# while also retaining a limited number of types even without references. This is generally enough to build 

32# specific recursive generic models without losing required items out of the cache. 

33 

34KT = TypeVar('KT') 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

35VT = TypeVar('VT') 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

36_LIMITED_DICT_SIZE = 100 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

37if TYPE_CHECKING: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

38 

39 class LimitedDict(dict, MutableMapping[KT, VT]): 

40 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): ... 

41 

42else: 

43 

44 class LimitedDict(dict): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

45 """Limit the size/length of a dict used for caching to avoid unlimited increase in memory usage. 

46 

47 Since the dict is ordered, and we always remove elements from the beginning, this is effectively a FIFO cache. 

48 """ 

49 

50 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

51 self.size_limit = size_limit 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

52 super().__init__() 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

53 

54 def __setitem__(self, key: Any, value: Any, /) -> None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

55 super().__setitem__(key, value) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

56 if len(self) > self.size_limit: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

57 excess = len(self) - self.size_limit + self.size_limit // 10 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

58 to_remove = list(self.keys())[:excess] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

59 for k in to_remove: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

60 del self[k] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

61 

62 

63# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected 

64# once they are no longer referenced by the caller. 

65if sys.version_info >= (3, 9): # Typing for weak dictionaries available at 3.9 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

66 GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 1BCabcdefghLiFGjklmnopqVMNOPQRSTJKrstuvwxy

67else: 

68 GenericTypesCache = WeakValueDictionary 1zADEUHI

69 

70if TYPE_CHECKING: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

71 

72 class DeepChainMap(ChainMap[KT, VT]): # type: ignore 

73 ... 

74 

75else: 

76 

77 class DeepChainMap(ChainMap): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

78 """Variant of ChainMap that allows direct updates to inner scopes. 

79 

80 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap, 

81 with some light modifications for this use case. 

82 """ 

83 

84 def clear(self) -> None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

85 for mapping in self.maps: 

86 mapping.clear() 

87 

88 def __setitem__(self, key: KT, value: VT) -> None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

89 for mapping in self.maps: 

90 mapping[key] = value 

91 

92 def __delitem__(self, key: KT) -> None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

93 hit = False 

94 for mapping in self.maps: 

95 if key in mapping: 

96 del mapping[key] 

97 hit = True 

98 if not hit: 

99 raise KeyError(key) 

100 

101 

102# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it 

103# and discover later on that we need to re-add all this infrastructure... 

104# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict()) 

105 

106_GENERIC_TYPES_CACHE = GenericTypesCache() 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

107 

108 

109class PydanticGenericMetadata(typing_extensions.TypedDict): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

110 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

111 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

112 parameters: tuple[type[Any], ...] # analogous to typing.Generic.__parameters__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

113 

114 

115def create_generic_submodel( 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

116 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...] 

117) -> type[BaseModel]: 

118 """Dynamically create a submodel of a provided (generic) BaseModel. 

119 

120 This is used when producing concrete parametrizations of generic models. This function 

121 only *creates* the new subclass; the schema/validators/serialization must be updated to 

122 reflect a concrete parametrization elsewhere. 

123 

124 Args: 

125 model_name: The name of the newly created model. 

126 origin: The base class for the new model to inherit from. 

127 args: A tuple of generic metadata arguments. 

128 params: A tuple of generic metadata parameters. 

129 

130 Returns: 

131 The created submodel. 

132 """ 

133 namespace: dict[str, Any] = {'__module__': origin.__module__} 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

134 bases = (origin,) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

135 meta, ns, kwds = prepare_class(model_name, bases) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

136 namespace.update(ns) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

137 created_model = meta( 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

138 model_name, 

139 bases, 

140 namespace, 

141 __pydantic_generic_metadata__={ 

142 'origin': origin, 

143 'args': args, 

144 'parameters': params, 

145 }, 

146 __pydantic_reset_parent_namespace__=False, 

147 **kwds, 

148 ) 

149 

150 model_module, called_globally = _get_caller_frame_info(depth=3) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

151 if called_globally: # create global reference and therefore allow pickling 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

152 object_by_reference = None 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

153 reference_name = model_name 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

154 reference_module_globals = sys.modules[created_model.__module__].__dict__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

155 while object_by_reference is not created_model: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

156 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

157 reference_name += '_' 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

158 

159 return created_model 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

160 

161 

162def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

163 """Used inside a function to check whether it was called globally. 

164 

165 Args: 

166 depth: The depth to get the frame. 

167 

168 Returns: 

169 A tuple contains `module_name` and `called_globally`. 

170 

171 Raises: 

172 RuntimeError: If the function is not called inside a function. 

173 """ 

174 try: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

175 previous_caller_frame = sys._getframe(depth) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

176 except ValueError as e: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

177 raise RuntimeError('This function must be used inside another function') from e 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

178 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

179 return None, False 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

180 frame_globals = previous_caller_frame.f_globals 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

181 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

182 

183 

184DictValues: type[Any] = {}.values().__class__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

185 

186 

187def iter_contained_typevars(v: Any) -> Iterator[TypeVarType]: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

188 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found. 

189 

190 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias, 

191 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list. 

192 """ 

193 if isinstance(v, TypeVar): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

194 yield v 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

195 elif is_model_class(v): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

196 yield from v.__pydantic_generic_metadata__['parameters'] 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

197 elif isinstance(v, (DictValues, list)): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

198 for var in v: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

199 yield from iter_contained_typevars(var) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

200 else: 

201 args = get_args(v) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

202 for arg in args: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

203 yield from iter_contained_typevars(arg) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

204 

205 

206def get_args(v: Any) -> Any: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

207 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

208 if pydantic_generic_metadata: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

209 return pydantic_generic_metadata.get('args') 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

210 return typing_extensions.get_args(v) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

211 

212 

213def get_origin(v: Any) -> Any: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

214 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

215 if pydantic_generic_metadata: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

216 return pydantic_generic_metadata.get('origin') 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

217 return typing_extensions.get_origin(v) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

218 

219 

220def get_standard_typevars_map(cls: type[Any]) -> dict[TypeVarType, Any] | None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

221 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the 

222 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias. 

223 """ 

224 origin = get_origin(cls) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

225 if origin is None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

226 return None 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

227 if not hasattr(origin, '__parameters__'): 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

228 return None 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

229 

230 # In this case, we know that cls is a _GenericAlias, and origin is the generic type 

231 # So it is safe to access cls.__args__ and origin.__parameters__ 

232 args: tuple[Any, ...] = cls.__args__ # type: ignore 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

233 parameters: tuple[TypeVarType, ...] = origin.__parameters__ 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

234 return dict(zip(parameters, args)) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

235 

236 

237def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVarType, Any] | None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

238 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible 

239 with the `replace_types` function. 

240 

241 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is 

242 stored in the __pydantic_generic_metadata__ attribute, we need special handling here. 

243 """ 

244 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata 

245 # in the __origin__, __args__, and __parameters__ attributes of the model. 

246 generic_metadata = cls.__pydantic_generic_metadata__ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

247 origin = generic_metadata['origin'] 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

248 args = generic_metadata['args'] 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

249 return dict(zip(iter_contained_typevars(origin), args)) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

250 

251 

252def replace_types(type_: Any, type_map: Mapping[Any, Any] | None) -> Any: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

253 """Return type with all occurrences of `type_map` keys recursively replaced with their values. 

254 

255 Args: 

256 type_: The class or generic alias. 

257 type_map: Mapping from `TypeVar` instance to concrete types. 

258 

259 Returns: 

260 A new type representing the basic structure of `type_` with all 

261 `typevar_map` keys recursively replaced. 

262 

263 Example: 

264 ```py 

265 from typing import List, Tuple, Union 

266 

267 from pydantic._internal._generics import replace_types 

268 

269 replace_types(Tuple[str, Union[List[str], float]], {str: int}) 

270 #> Tuple[int, Union[List[int], float]] 

271 ``` 

272 """ 

273 if not type_map: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

274 return type_ 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

275 

276 type_args = get_args(type_) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

277 origin_type = get_origin(type_) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

278 

279 if origin_type is typing_extensions.Annotated: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

280 annotated_type, *annotations = type_args 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

281 annotated = replace_types(annotated_type, type_map) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

282 for annotation in annotations: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

283 annotated = typing_extensions.Annotated[annotated, annotation] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

284 return annotated 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

285 

286 # Having type args is a good indicator that this is a typing module 

287 # class instantiation or a generic alias of some sort. 

288 if type_args: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

289 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

290 if all_identical(type_args, resolved_type_args): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

291 # If all arguments are the same, there is no need to modify the 

292 # type or create a new object at all 

293 return type_ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

294 if ( 1zABCLiDEFGHIJK

295 origin_type is not None 

296 and isinstance(type_, typing_base) 

297 and not isinstance(origin_type, typing_base) 

298 and getattr(type_, '_name', None) is not None 

299 ): 

300 # In python < 3.9 generic aliases don't exist so any of these like `list`, 

301 # `type` or `collections.abc.Callable` need to be translated. 

302 # See: https://www.python.org/dev/peps/pep-0585 

303 origin_type = getattr(typing, type_._name) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

304 assert origin_type is not None 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

305 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__. 

306 # We also cannot use isinstance() since we have to compare types. 

307 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

308 return _UnionGenericAlias(origin_type, resolved_type_args) 1abcdefghijklmnopqrstuvwxy

309 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below 

310 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

311 

312 # We handle pydantic generic models separately as they don't have the same 

313 # semantics as "typing" classes or generic aliases 

314 

315 if not origin_type and is_model_class(type_): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

316 parameters = type_.__pydantic_generic_metadata__['parameters'] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

317 if not parameters: 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

318 return type_ 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

319 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

320 if all_identical(parameters, resolved_type_args): 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

321 return type_ 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

322 return type_[resolved_type_args] 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

323 

324 # Handle special case for typehints that can have lists as arguments. 

325 # `typing.Callable[[int, str], int]` is an example for this. 

326 if isinstance(type_, (List, list)): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

327 resolved_list = list(replace_types(element, type_map) for element in type_) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

328 if all_identical(type_, resolved_list): 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

329 return type_ 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

330 return resolved_list 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

331 

332 # If all else fails, we try to resolve the type directly and otherwise just 

333 # return the input with no modifications. 

334 return type_map.get(type_, type_) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

335 

336 

337def has_instance_in_type(type_: Any, isinstance_target: Any) -> bool: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

338 """Checks if the type, or any of its arbitrary nested args, satisfy 

339 `isinstance(<type>, isinstance_target)`. 

340 """ 

341 if isinstance(type_, isinstance_target): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

342 return True 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

343 

344 type_args = get_args(type_) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

345 origin_type = get_origin(type_) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

346 

347 if origin_type is typing_extensions.Annotated: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

348 annotated_type, *annotations = type_args 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

349 return has_instance_in_type(annotated_type, isinstance_target) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

350 

351 # Having type args is a good indicator that this is a typing module 

352 # class instantiation or a generic alias of some sort. 

353 if any(has_instance_in_type(a, isinstance_target) for a in type_args): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

354 return True 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

355 

356 # Handle special case for typehints that can have lists as arguments. 

357 # `typing.Callable[[int, str], int]` is an example for this. 

358 if isinstance(type_, (List, list)) and not isinstance(type_, typing_extensions.ParamSpec): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

359 if any(has_instance_in_type(element, isinstance_target) for element in type_): 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

360 return True 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

361 

362 return False 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

363 

364 

365def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]) -> None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

366 """Check the generic model parameters count is equal. 

367 

368 Args: 

369 cls: The generic model. 

370 parameters: A tuple of passed parameters to the generic model. 

371 

372 Raises: 

373 TypeError: If the passed parameters count is not equal to generic model parameters count. 

374 """ 

375 actual = len(parameters) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

376 expected = len(cls.__pydantic_generic_metadata__['parameters']) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

377 if actual != expected: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

378 description = 'many' if actual > expected else 'few' 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

379 raise TypeError(f'Too {description} parameters for {cls}; actual {actual}, expected {expected}') 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

380 

381 

382_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

383 

384 

385@contextmanager 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

386def generic_recursion_self_type( 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

387 origin: type[BaseModel], args: tuple[Any, ...] 

388) -> Iterator[PydanticRecursiveRef | None]: 

389 """This contextmanager should be placed around the recursive calls used to build a generic type, 

390 and accept as arguments the generic origin type and the type arguments being passed to it. 

391 

392 If the same origin and arguments are observed twice, it implies that a self-reference placeholder 

393 can be used while building the core schema, and will produce a schema_ref that will be valid in the 

394 final parent schema. 

395 """ 

396 previously_seen_type_refs = _generic_recursion_cache.get() 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

397 if previously_seen_type_refs is None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

398 previously_seen_type_refs = set() 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

399 token = _generic_recursion_cache.set(previously_seen_type_refs) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

400 else: 

401 token = None 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

402 

403 try: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

404 type_ref = get_type_ref(origin, args_override=args) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

405 if type_ref in previously_seen_type_refs: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

406 self_type = PydanticRecursiveRef(type_ref=type_ref) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

407 yield self_type 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

408 else: 

409 previously_seen_type_refs.add(type_ref) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

410 yield None 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

411 finally: 

412 if token: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

413 _generic_recursion_cache.reset(token) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

414 

415 

416def recursively_defined_type_refs() -> set[str]: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

417 visited = _generic_recursion_cache.get() 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

418 if not visited: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

419 return set() # not in a generic recursion, so there are no types 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

420 

421 return visited.copy() # don't allow modifications 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

422 

423 

424def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

425 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for 

426 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime), 

427 while still ensuring that certain alternative parametrizations ultimately resolve to the same type. 

428 

429 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]]. 

430 The approach could be modified to not use two different cache keys at different points, but the 

431 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the 

432 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the 

433 same after resolving the type arguments will always produce cache hits. 

434 

435 If we wanted to move to only using a single cache key per type, we would either need to always use the 

436 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept 

437 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships 

438 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually 

439 equal. 

440 """ 

441 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values)) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

442 

443 

444def get_cached_generic_type_late( 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

445 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...] 

446) -> type[BaseModel] | None: 

447 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup.""" 

448 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values)) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

449 if cached is not None: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

450 set_cached_generic_type(parent, typevar_values, cached, origin, args) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

451 return cached 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

452 

453 

454def set_cached_generic_type( 1zABCabcdefghDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

455 parent: type[BaseModel], 

456 typevar_values: tuple[Any, ...], 

457 type_: type[BaseModel], 

458 origin: type[BaseModel] | None = None, 

459 args: tuple[Any, ...] | None = None, 

460) -> None: 

461 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with 

462 two different keys. 

463 """ 

464 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

465 if len(typevar_values) == 1: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

466 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

467 if origin and args: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

468 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_ 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

469 

470 

471def _union_orderings_key(typevar_values: Any) -> Any: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

472 """This is intended to help differentiate between Union types with the same arguments in different order. 

473 

474 Thanks to caching internal to the `typing` module, it is not possible to distinguish between 

475 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List) 

476 because `typing` considers Union[int, float] to be equal to Union[float, int]. 

477 

478 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int]. 

479 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior 

480 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_ 

481 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself. 

482 (See https://github.com/python/cpython/issues/86483 for reference.) 

483 """ 

484 if isinstance(typevar_values, tuple): 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

485 args_data = [] 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

486 for value in typevar_values: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

487 args_data.append(_union_orderings_key(value)) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

488 return tuple(args_data) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

489 elif typing_extensions.get_origin(typevar_values) is typing.Union: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

490 return get_args(typevar_values) 1zABCabcdefghLiDEFGjklmnopqHIJKrstuvwxy

491 else: 

492 return () 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

493 

494 

495def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

496 """This is intended for minimal computational overhead during lookups of cached types. 

497 

498 Note that this is overly simplistic, and it's possible that two different cls/typevar_values 

499 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__. 

500 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key 

501 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__ 

502 would result in the same type. 

503 """ 

504 return cls, typevar_values, _union_orderings_key(typevar_values) 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

505 

506 

507def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy

508 """This is intended for use later in the process of creating a new type, when we have more information 

509 about the exact args that will be passed. If it turns out that a different set of inputs to 

510 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still 

511 return the cached type, and update the cache with the _early_cache_key as well. 

512 """ 

513 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an 

514 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key, 

515 # whereas this function will always produce a tuple as the first item in the key. 

516 return _union_orderings_key(typevar_values), origin, args 1zABCabcdefghLiDEFGjklmnopqUVMNOPQRSTHIJKrstuvwxy