Coverage for pydantic/_internal/_generics.py: 93.52%
218 statements
« prev ^ index » next coverage.py v7.5.3, created at 2024-06-21 17:00 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2024-06-21 17:00 +0000
1from __future__ import annotations 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
3import sys 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
4import types 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
5import typing 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
6from collections import ChainMap 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
7from contextlib import contextmanager 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
8from contextvars import ContextVar 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
9from types import prepare_class 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
10from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, MutableMapping, Tuple, TypeVar 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
11from weakref import WeakValueDictionary 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
13import typing_extensions 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
15from ._core_utils import get_type_ref 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
16from ._forward_ref import PydanticRecursiveRef 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
17from ._typing_extra import TypeVarType, typing_base 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
18from ._utils import all_identical, is_model_class 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
20if sys.version_info >= (3, 10): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
21 from typing import _UnionGenericAlias # type: ignore[attr-defined] 1abcdefghijklmGHIJKLMnopqrs
23if TYPE_CHECKING: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
24 from ..main import BaseModel
26GenericTypesCacheKey = Tuple[Any, Any, Tuple[Any, ...]] 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
28# Note: We want to remove LimitedDict, but to do this, we'd need to improve the handling of generics caching.
29# Right now, to handle recursive generics, we some types must remain cached for brief periods without references.
30# By chaining the WeakValuesDict with a LimitedDict, we have a way to retain caching for all types with references,
31# while also retaining a limited number of types even without references. This is generally enough to build
32# specific recursive generic models without losing required items out of the cache.
34KT = TypeVar('KT') 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
35VT = TypeVar('VT') 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
36_LIMITED_DICT_SIZE = 100 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
37if TYPE_CHECKING: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
39 class LimitedDict(dict, MutableMapping[KT, VT]):
40 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): ...
42else:
44 class LimitedDict(dict): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
45 """Limit the size/length of a dict used for caching to avoid unlimited increase in memory usage.
47 Since the dict is ordered, and we always remove elements from the beginning, this is effectively a FIFO cache.
48 """
50 def __init__(self, size_limit: int = _LIMITED_DICT_SIZE): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
51 self.size_limit = size_limit 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
52 super().__init__() 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
54 def __setitem__(self, key: Any, value: Any, /) -> None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
55 super().__setitem__(key, value) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
56 if len(self) > self.size_limit: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
57 excess = len(self) - self.size_limit + self.size_limit // 10 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
58 to_remove = list(self.keys())[:excess] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
59 for k in to_remove: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
60 del self[k] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
63# weak dictionaries allow the dynamically created parametrized versions of generic models to get collected
64# once they are no longer referenced by the caller.
65if sys.version_info >= (3, 9): # Typing for weak dictionaries available at 3.9 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
66 GenericTypesCache = WeakValueDictionary[GenericTypesCacheKey, 'type[BaseModel]'] 1vwabcdefFgzAhijklmOGHIJKLMDEnopqrs
67else:
68 GenericTypesCache = WeakValueDictionary 1tuxyNBC
70if TYPE_CHECKING: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
72 class DeepChainMap(ChainMap[KT, VT]): # type: ignore
73 ...
75else:
77 class DeepChainMap(ChainMap): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
78 """Variant of ChainMap that allows direct updates to inner scopes.
80 Taken from https://docs.python.org/3/library/collections.html#collections.ChainMap,
81 with some light modifications for this use case.
82 """
84 def clear(self) -> None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
85 for mapping in self.maps:
86 mapping.clear()
88 def __setitem__(self, key: KT, value: VT) -> None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
89 for mapping in self.maps:
90 mapping[key] = value
92 def __delitem__(self, key: KT) -> None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
93 hit = False
94 for mapping in self.maps:
95 if key in mapping:
96 del mapping[key]
97 hit = True
98 if not hit:
99 raise KeyError(key)
102# Despite the fact that LimitedDict _seems_ no longer necessary, I'm very nervous to actually remove it
103# and discover later on that we need to re-add all this infrastructure...
104# _GENERIC_TYPES_CACHE = DeepChainMap(GenericTypesCache(), LimitedDict())
106_GENERIC_TYPES_CACHE = GenericTypesCache() 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
109class PydanticGenericMetadata(typing_extensions.TypedDict): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
110 origin: type[BaseModel] | None # analogous to typing._GenericAlias.__origin__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
111 args: tuple[Any, ...] # analogous to typing._GenericAlias.__args__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
112 parameters: tuple[type[Any], ...] # analogous to typing.Generic.__parameters__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
115def create_generic_submodel( 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
116 model_name: str, origin: type[BaseModel], args: tuple[Any, ...], params: tuple[Any, ...]
117) -> type[BaseModel]:
118 """Dynamically create a submodel of a provided (generic) BaseModel.
120 This is used when producing concrete parametrizations of generic models. This function
121 only *creates* the new subclass; the schema/validators/serialization must be updated to
122 reflect a concrete parametrization elsewhere.
124 Args:
125 model_name: The name of the newly created model.
126 origin: The base class for the new model to inherit from.
127 args: A tuple of generic metadata arguments.
128 params: A tuple of generic metadata parameters.
130 Returns:
131 The created submodel.
132 """
133 namespace: dict[str, Any] = {'__module__': origin.__module__} 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
134 bases = (origin,) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
135 meta, ns, kwds = prepare_class(model_name, bases) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
136 namespace.update(ns) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
137 created_model = meta( 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
138 model_name,
139 bases,
140 namespace,
141 __pydantic_generic_metadata__={
142 'origin': origin,
143 'args': args,
144 'parameters': params,
145 },
146 __pydantic_reset_parent_namespace__=False,
147 **kwds,
148 )
150 model_module, called_globally = _get_caller_frame_info(depth=3) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
151 if called_globally: # create global reference and therefore allow pickling 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
152 object_by_reference = None 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
153 reference_name = model_name 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
154 reference_module_globals = sys.modules[created_model.__module__].__dict__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
155 while object_by_reference is not created_model: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
156 object_by_reference = reference_module_globals.setdefault(reference_name, created_model) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
157 reference_name += '_' 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
159 return created_model 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
162def _get_caller_frame_info(depth: int = 2) -> tuple[str | None, bool]: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
163 """Used inside a function to check whether it was called globally.
165 Args:
166 depth: The depth to get the frame.
168 Returns:
169 A tuple contains `module_name` and `called_globally`.
171 Raises:
172 RuntimeError: If the function is not called inside a function.
173 """
174 try: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
175 previous_caller_frame = sys._getframe(depth) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
176 except ValueError as e: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
177 raise RuntimeError('This function must be used inside another function') from e 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
178 except AttributeError: # sys module does not have _getframe function, so there's nothing we can do about it 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
179 return None, False 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
180 frame_globals = previous_caller_frame.f_globals 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
181 return frame_globals.get('__name__'), previous_caller_frame.f_locals is frame_globals 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
184DictValues: type[Any] = {}.values().__class__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
187def iter_contained_typevars(v: Any) -> Iterator[TypeVarType]: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
188 """Recursively iterate through all subtypes and type args of `v` and yield any typevars that are found.
190 This is inspired as an alternative to directly accessing the `__parameters__` attribute of a GenericAlias,
191 since __parameters__ of (nested) generic BaseModel subclasses won't show up in that list.
192 """
193 if isinstance(v, TypeVar): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
194 yield v 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
195 elif is_model_class(v): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
196 yield from v.__pydantic_generic_metadata__['parameters'] 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
197 elif isinstance(v, (DictValues, list)): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
198 for var in v: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
199 yield from iter_contained_typevars(var) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
200 else:
201 args = get_args(v) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
202 for arg in args: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
203 yield from iter_contained_typevars(arg) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
206def get_args(v: Any) -> Any: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
207 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
208 if pydantic_generic_metadata: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
209 return pydantic_generic_metadata.get('args') 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
210 return typing_extensions.get_args(v) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
213def get_origin(v: Any) -> Any: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
214 pydantic_generic_metadata: PydanticGenericMetadata | None = getattr(v, '__pydantic_generic_metadata__', None) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
215 if pydantic_generic_metadata: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
216 return pydantic_generic_metadata.get('origin') 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
217 return typing_extensions.get_origin(v) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
220def get_standard_typevars_map(cls: type[Any]) -> dict[TypeVarType, Any] | None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
221 """Package a generic type's typevars and parametrization (if present) into a dictionary compatible with the
222 `replace_types` function. Specifically, this works with standard typing generics and typing._GenericAlias.
223 """
224 origin = get_origin(cls) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
225 if origin is None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
226 return None 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
227 if not hasattr(origin, '__parameters__'): 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
228 return None 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
230 # In this case, we know that cls is a _GenericAlias, and origin is the generic type
231 # So it is safe to access cls.__args__ and origin.__parameters__
232 args: tuple[Any, ...] = cls.__args__ # type: ignore 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
233 parameters: tuple[TypeVarType, ...] = origin.__parameters__ 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
234 return dict(zip(parameters, args)) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
237def get_model_typevars_map(cls: type[BaseModel]) -> dict[TypeVarType, Any] | None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
238 """Package a generic BaseModel's typevars and concrete parametrization (if present) into a dictionary compatible
239 with the `replace_types` function.
241 Since BaseModel.__class_getitem__ does not produce a typing._GenericAlias, and the BaseModel generic info is
242 stored in the __pydantic_generic_metadata__ attribute, we need special handling here.
243 """
244 # TODO: This could be unified with `get_standard_typevars_map` if we stored the generic metadata
245 # in the __origin__, __args__, and __parameters__ attributes of the model.
246 generic_metadata = cls.__pydantic_generic_metadata__ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
247 origin = generic_metadata['origin'] 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
248 args = generic_metadata['args'] 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
249 return dict(zip(iter_contained_typevars(origin), args)) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
252def replace_types(type_: Any, type_map: Mapping[Any, Any] | None) -> Any: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
253 """Return type with all occurrences of `type_map` keys recursively replaced with their values.
255 Args:
256 type_: The class or generic alias.
257 type_map: Mapping from `TypeVar` instance to concrete types.
259 Returns:
260 A new type representing the basic structure of `type_` with all
261 `typevar_map` keys recursively replaced.
263 Example:
264 ```py
265 from typing import List, Tuple, Union
267 from pydantic._internal._generics import replace_types
269 replace_types(Tuple[str, Union[List[str], float]], {str: int})
270 #> Tuple[int, Union[List[int], float]]
271 ```
272 """
273 if not type_map: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
274 return type_ 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
276 type_args = get_args(type_) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
277 origin_type = get_origin(type_) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
279 if origin_type is typing_extensions.Annotated: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
280 annotated_type, *annotations = type_args 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
281 annotated = replace_types(annotated_type, type_map) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
282 for annotation in annotations: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
283 annotated = typing_extensions.Annotated[annotated, annotation] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
284 return annotated 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
286 # Having type args is a good indicator that this is a typing module
287 # class instantiation or a generic alias of some sort.
288 if type_args: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
289 resolved_type_args = tuple(replace_types(arg, type_map) for arg in type_args) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
290 if all_identical(type_args, resolved_type_args): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
291 # If all arguments are the same, there is no need to modify the
292 # type or create a new object at all
293 return type_ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
294 if ( 1tuvwFgxyzABCDE
295 origin_type is not None
296 and isinstance(type_, typing_base)
297 and not isinstance(origin_type, typing_base)
298 and getattr(type_, '_name', None) is not None
299 ):
300 # In python < 3.9 generic aliases don't exist so any of these like `list`,
301 # `type` or `collections.abc.Callable` need to be translated.
302 # See: https://www.python.org/dev/peps/pep-0585
303 origin_type = getattr(typing, type_._name) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
304 assert origin_type is not None 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
305 # PEP-604 syntax (Ex.: list | str) is represented with a types.UnionType object that does not have __getitem__.
306 # We also cannot use isinstance() since we have to compare types.
307 if sys.version_info >= (3, 10) and origin_type is types.UnionType: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
308 return _UnionGenericAlias(origin_type, resolved_type_args) 1abcdefghijklmnopqrs
309 # NotRequired[T] and Required[T] don't support tuple type resolved_type_args, hence the condition below
310 return origin_type[resolved_type_args[0] if len(resolved_type_args) == 1 else resolved_type_args] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
312 # We handle pydantic generic models separately as they don't have the same
313 # semantics as "typing" classes or generic aliases
315 if not origin_type and is_model_class(type_): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
316 parameters = type_.__pydantic_generic_metadata__['parameters'] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
317 if not parameters: 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
318 return type_ 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
319 resolved_type_args = tuple(replace_types(t, type_map) for t in parameters) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
320 if all_identical(parameters, resolved_type_args): 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
321 return type_ 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
322 return type_[resolved_type_args] 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
324 # Handle special case for typehints that can have lists as arguments.
325 # `typing.Callable[[int, str], int]` is an example for this.
326 if isinstance(type_, (List, list)): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
327 resolved_list = list(replace_types(element, type_map) for element in type_) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
328 if all_identical(type_, resolved_list): 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
329 return type_ 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
330 return resolved_list 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
332 # If all else fails, we try to resolve the type directly and otherwise just
333 # return the input with no modifications.
334 return type_map.get(type_, type_) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
337def has_instance_in_type(type_: Any, isinstance_target: Any) -> bool: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
338 """Checks if the type, or any of its arbitrary nested args, satisfy
339 `isinstance(<type>, isinstance_target)`.
340 """
341 if isinstance(type_, isinstance_target): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
342 return True 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
344 type_args = get_args(type_) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
345 origin_type = get_origin(type_) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
347 if origin_type is typing_extensions.Annotated: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
348 annotated_type, *annotations = type_args 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
349 return has_instance_in_type(annotated_type, isinstance_target) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
351 # Having type args is a good indicator that this is a typing module
352 # class instantiation or a generic alias of some sort.
353 if any(has_instance_in_type(a, isinstance_target) for a in type_args): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
354 return True 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
356 # Handle special case for typehints that can have lists as arguments.
357 # `typing.Callable[[int, str], int]` is an example for this.
358 if isinstance(type_, (List, list)) and not isinstance(type_, typing_extensions.ParamSpec): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
359 if any(has_instance_in_type(element, isinstance_target) for element in type_): 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
360 return True 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
362 return False 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
365def check_parameters_count(cls: type[BaseModel], parameters: tuple[Any, ...]) -> None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
366 """Check the generic model parameters count is equal.
368 Args:
369 cls: The generic model.
370 parameters: A tuple of passed parameters to the generic model.
372 Raises:
373 TypeError: If the passed parameters count is not equal to generic model parameters count.
374 """
375 actual = len(parameters) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
376 expected = len(cls.__pydantic_generic_metadata__['parameters']) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
377 if actual != expected: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
378 description = 'many' if actual > expected else 'few' 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
379 raise TypeError(f'Too {description} parameters for {cls}; actual {actual}, expected {expected}') 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
382_generic_recursion_cache: ContextVar[set[str] | None] = ContextVar('_generic_recursion_cache', default=None) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
385@contextmanager 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
386def generic_recursion_self_type( 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
387 origin: type[BaseModel], args: tuple[Any, ...]
388) -> Iterator[PydanticRecursiveRef | None]:
389 """This contextmanager should be placed around the recursive calls used to build a generic type,
390 and accept as arguments the generic origin type and the type arguments being passed to it.
392 If the same origin and arguments are observed twice, it implies that a self-reference placeholder
393 can be used while building the core schema, and will produce a schema_ref that will be valid in the
394 final parent schema.
395 """
396 previously_seen_type_refs = _generic_recursion_cache.get() 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
397 if previously_seen_type_refs is None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
398 previously_seen_type_refs = set() 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
399 token = _generic_recursion_cache.set(previously_seen_type_refs) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
400 else:
401 token = None 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
403 try: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
404 type_ref = get_type_ref(origin, args_override=args) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
405 if type_ref in previously_seen_type_refs: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
406 self_type = PydanticRecursiveRef(type_ref=type_ref) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
407 yield self_type 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
408 else:
409 previously_seen_type_refs.add(type_ref) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
410 yield None 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
411 finally:
412 if token: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
413 _generic_recursion_cache.reset(token) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
416def recursively_defined_type_refs() -> set[str]: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
417 visited = _generic_recursion_cache.get() 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
418 if not visited: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
419 return set() # not in a generic recursion, so there are no types 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
421 return visited.copy() # don't allow modifications 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
424def get_cached_generic_type_early(parent: type[BaseModel], typevar_values: Any) -> type[BaseModel] | None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
425 """The use of a two-stage cache lookup approach was necessary to have the highest performance possible for
426 repeated calls to `__class_getitem__` on generic types (which may happen in tighter loops during runtime),
427 while still ensuring that certain alternative parametrizations ultimately resolve to the same type.
429 As a concrete example, this approach was necessary to make Model[List[T]][int] equal to Model[List[int]].
430 The approach could be modified to not use two different cache keys at different points, but the
431 _early_cache_key is optimized to be as quick to compute as possible (for repeated-access speed), and the
432 _late_cache_key is optimized to be as "correct" as possible, so that two types that will ultimately be the
433 same after resolving the type arguments will always produce cache hits.
435 If we wanted to move to only using a single cache key per type, we would either need to always use the
436 slower/more computationally intensive logic associated with _late_cache_key, or would need to accept
437 that Model[List[T]][int] is a different type than Model[List[T]][int]. Because we rely on subclass relationships
438 during validation, I think it is worthwhile to ensure that types that are functionally equivalent are actually
439 equal.
440 """
441 return _GENERIC_TYPES_CACHE.get(_early_cache_key(parent, typevar_values)) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
444def get_cached_generic_type_late( 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
445 parent: type[BaseModel], typevar_values: Any, origin: type[BaseModel], args: tuple[Any, ...]
446) -> type[BaseModel] | None:
447 """See the docstring of `get_cached_generic_type_early` for more information about the two-stage cache lookup."""
448 cached = _GENERIC_TYPES_CACHE.get(_late_cache_key(origin, args, typevar_values)) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
449 if cached is not None: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
450 set_cached_generic_type(parent, typevar_values, cached, origin, args) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
451 return cached 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
454def set_cached_generic_type( 1tuvwabcdefxyzAhijklmNOGHIJKLMBCDEnopqrs
455 parent: type[BaseModel],
456 typevar_values: tuple[Any, ...],
457 type_: type[BaseModel],
458 origin: type[BaseModel] | None = None,
459 args: tuple[Any, ...] | None = None,
460) -> None:
461 """See the docstring of `get_cached_generic_type_early` for more information about why items are cached with
462 two different keys.
463 """
464 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values)] = type_ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
465 if len(typevar_values) == 1: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
466 _GENERIC_TYPES_CACHE[_early_cache_key(parent, typevar_values[0])] = type_ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
467 if origin and args: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
468 _GENERIC_TYPES_CACHE[_late_cache_key(origin, args, typevar_values)] = type_ 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
471def _union_orderings_key(typevar_values: Any) -> Any: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
472 """This is intended to help differentiate between Union types with the same arguments in different order.
474 Thanks to caching internal to the `typing` module, it is not possible to distinguish between
475 List[Union[int, float]] and List[Union[float, int]] (and similarly for other "parent" origins besides List)
476 because `typing` considers Union[int, float] to be equal to Union[float, int].
478 However, you _can_ distinguish between (top-level) Union[int, float] vs. Union[float, int].
479 Because we parse items as the first Union type that is successful, we get slightly more consistent behavior
480 if we make an effort to distinguish the ordering of items in a union. It would be best if we could _always_
481 get the exact-correct order of items in the union, but that would require a change to the `typing` module itself.
482 (See https://github.com/python/cpython/issues/86483 for reference.)
483 """
484 if isinstance(typevar_values, tuple): 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
485 args_data = [] 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
486 for value in typevar_values: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
487 args_data.append(_union_orderings_key(value)) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
488 return tuple(args_data) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
489 elif typing_extensions.get_origin(typevar_values) is typing.Union: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
490 return get_args(typevar_values) 1tuvwabcdefFgxyzAhijklmBCDEnopqrs
491 else:
492 return () 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
495def _early_cache_key(cls: type[BaseModel], typevar_values: Any) -> GenericTypesCacheKey: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
496 """This is intended for minimal computational overhead during lookups of cached types.
498 Note that this is overly simplistic, and it's possible that two different cls/typevar_values
499 inputs would ultimately result in the same type being created in BaseModel.__class_getitem__.
500 To handle this, we have a fallback _late_cache_key that is checked later if the _early_cache_key
501 lookup fails, and should result in a cache hit _precisely_ when the inputs to __class_getitem__
502 would result in the same type.
503 """
504 return cls, typevar_values, _union_orderings_key(typevar_values) 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
507def _late_cache_key(origin: type[BaseModel], args: tuple[Any, ...], typevar_values: Any) -> GenericTypesCacheKey: 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs
508 """This is intended for use later in the process of creating a new type, when we have more information
509 about the exact args that will be passed. If it turns out that a different set of inputs to
510 __class_getitem__ resulted in the same inputs to the generic type creation process, we can still
511 return the cached type, and update the cache with the _early_cache_key as well.
512 """
513 # The _union_orderings_key is placed at the start here to ensure there cannot be a collision with an
514 # _early_cache_key, as that function will always produce a BaseModel subclass as the first item in the key,
515 # whereas this function will always produce a tuple as the first item in the key.
516 return _union_orderings_key(typevar_values), origin, args 1tuvwabcdefFgxyzAhijklmNOGHIJKLMBCDEnopqrs