Coverage for pydantic/_internal/_decorators.py: 97.58%
324 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-02 11:05 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-02 11:05 +0000
1"""Logic related to validators applied to models etc. via the `@field_validator` and `@model_validator` decorators."""
3from __future__ import annotations as _annotations 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
5import types 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
6from collections import deque 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
7from collections.abc import Iterable 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
8from dataclasses import dataclass, field 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
9from functools import cached_property, partial, partialmethod 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
10from inspect import Parameter, Signature, isdatadescriptor, ismethoddescriptor, signature 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
11from itertools import islice 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
12from typing import TYPE_CHECKING, Any, Callable, ClassVar, Generic, Literal, TypeVar, Union 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
14from pydantic_core import PydanticUndefined, PydanticUndefinedType, core_schema 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
15from typing_extensions import TypeAlias, is_typeddict 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
17from ..errors import PydanticUserError 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
18from ._core_utils import get_type_ref 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
19from ._internal_dataclass import slots_true 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
20from ._namespace_utils import GlobalsNamespace, MappingNamespace 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
21from ._typing_extra import get_function_type_hints 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
22from ._utils import can_be_positional 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
24if TYPE_CHECKING: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
25 from ..fields import ComputedFieldInfo
26 from ..functional_validators import FieldValidatorModes
27 from ._config import ConfigWrapper
30@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
31class ValidatorDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
32 """A container for data from `@validator` so that we can access it
33 while building the pydantic-core schema.
35 Attributes:
36 decorator_repr: A class variable representing the decorator string, '@validator'.
37 fields: A tuple of field names the validator should be called on.
38 mode: The proposed validator mode.
39 each_item: For complex objects (sets, lists etc.) whether to validate individual
40 elements rather than the whole object.
41 always: Whether this method and other validators should be called even if the value is missing.
42 check_fields: Whether to check that the fields actually exist on the model.
43 """
45 decorator_repr: ClassVar[str] = '@validator' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
47 fields: tuple[str, ...] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
48 mode: Literal['before', 'after'] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
49 each_item: bool 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
50 always: bool 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
51 check_fields: bool | None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
54@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
55class FieldValidatorDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
56 """A container for data from `@field_validator` so that we can access it
57 while building the pydantic-core schema.
59 Attributes:
60 decorator_repr: A class variable representing the decorator string, '@field_validator'.
61 fields: A tuple of field names the validator should be called on.
62 mode: The proposed validator mode.
63 check_fields: Whether to check that the fields actually exist on the model.
64 json_schema_input_type: The input type of the function. This is only used to generate
65 the appropriate JSON Schema (in validation mode) and can only specified
66 when `mode` is either `'before'`, `'plain'` or `'wrap'`.
67 """
69 decorator_repr: ClassVar[str] = '@field_validator' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
71 fields: tuple[str, ...] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
72 mode: FieldValidatorModes 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
73 check_fields: bool | None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
74 json_schema_input_type: Any 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
77@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
78class RootValidatorDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
79 """A container for data from `@root_validator` so that we can access it
80 while building the pydantic-core schema.
82 Attributes:
83 decorator_repr: A class variable representing the decorator string, '@root_validator'.
84 mode: The proposed validator mode.
85 """
87 decorator_repr: ClassVar[str] = '@root_validator' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
88 mode: Literal['before', 'after'] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
91@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
92class FieldSerializerDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
93 """A container for data from `@field_serializer` so that we can access it
94 while building the pydantic-core schema.
96 Attributes:
97 decorator_repr: A class variable representing the decorator string, '@field_serializer'.
98 fields: A tuple of field names the serializer should be called on.
99 mode: The proposed serializer mode.
100 return_type: The type of the serializer's return value.
101 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`,
102 and `'json-unless-none'`.
103 check_fields: Whether to check that the fields actually exist on the model.
104 """
106 decorator_repr: ClassVar[str] = '@field_serializer' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
107 fields: tuple[str, ...] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
108 mode: Literal['plain', 'wrap'] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
109 return_type: Any 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
110 when_used: core_schema.WhenUsed 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
111 check_fields: bool | None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
114@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
115class ModelSerializerDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
116 """A container for data from `@model_serializer` so that we can access it
117 while building the pydantic-core schema.
119 Attributes:
120 decorator_repr: A class variable representing the decorator string, '@model_serializer'.
121 mode: The proposed serializer mode.
122 return_type: The type of the serializer's return value.
123 when_used: The serialization condition. Accepts a string with values `'always'`, `'unless-none'`, `'json'`,
124 and `'json-unless-none'`.
125 """
127 decorator_repr: ClassVar[str] = '@model_serializer' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
128 mode: Literal['plain', 'wrap'] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
129 return_type: Any 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
130 when_used: core_schema.WhenUsed 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
133@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
134class ModelValidatorDecoratorInfo: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
135 """A container for data from `@model_validator` so that we can access it
136 while building the pydantic-core schema.
138 Attributes:
139 decorator_repr: A class variable representing the decorator string, '@model_validator'.
140 mode: The proposed serializer mode.
141 """
143 decorator_repr: ClassVar[str] = '@model_validator' 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
144 mode: Literal['wrap', 'before', 'after'] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
147DecoratorInfo: TypeAlias = """Union[ 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
148 ValidatorDecoratorInfo,
149 FieldValidatorDecoratorInfo,
150 RootValidatorDecoratorInfo,
151 FieldSerializerDecoratorInfo,
152 ModelSerializerDecoratorInfo,
153 ModelValidatorDecoratorInfo,
154 ComputedFieldInfo,
155]"""
157ReturnType = TypeVar('ReturnType') 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
158DecoratedType: TypeAlias = ( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
159 'Union[classmethod[Any, Any, ReturnType], staticmethod[Any, ReturnType], Callable[..., ReturnType], property]'
160)
163@dataclass # can't use slots here since we set attributes on `__post_init__` 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
164class PydanticDescriptorProxy(Generic[ReturnType]): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
165 """Wrap a classmethod, staticmethod, property or unbound function
166 and act as a descriptor that allows us to detect decorated items
167 from the class' attributes.
169 This class' __get__ returns the wrapped item's __get__ result,
170 which makes it transparent for classmethods and staticmethods.
172 Attributes:
173 wrapped: The decorator that has to be wrapped.
174 decorator_info: The decorator info.
175 shim: A wrapper function to wrap V1 style function.
176 """
178 wrapped: DecoratedType[ReturnType] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
179 decorator_info: DecoratorInfo 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
180 shim: Callable[[Callable[..., Any]], Callable[..., Any]] | None = None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
182 def __post_init__(self): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
183 for attr in 'setter', 'deleter': 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
184 if hasattr(self.wrapped, attr): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
185 f = partial(self._call_wrapped_attr, name=attr) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
186 setattr(self, attr, f) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
188 def _call_wrapped_attr(self, func: Callable[[Any], None], *, name: str) -> PydanticDescriptorProxy[ReturnType]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
189 self.wrapped = getattr(self.wrapped, name)(func) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
190 if isinstance(self.wrapped, property): 190 ↛ 196line 190 didn't jump to line 196 because the condition on line 190 was always true1stabcdefABCzyuvghijklDEFwxmnopqrGHI
191 # update ComputedFieldInfo.wrapped_property
192 from ..fields import ComputedFieldInfo 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
194 if isinstance(self.decorator_info, ComputedFieldInfo): 194 ↛ 196line 194 didn't jump to line 196 because the condition on line 194 was always true1stabcdefABCzyuvghijklDEFwxmnopqrGHI
195 self.decorator_info.wrapped_property = self.wrapped 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
196 return self 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
198 def __get__(self, obj: object | None, obj_type: type[object] | None = None) -> PydanticDescriptorProxy[ReturnType]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
199 try: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
200 return self.wrapped.__get__(obj, obj_type) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
201 except AttributeError: 1stabcdefzyuvghijklwxmnopqr
202 # not a descriptor, e.g. a partial object
203 return self.wrapped # type: ignore[return-value] 1stabcdefzyuvghijklwxmnopqr
205 def __set_name__(self, instance: Any, name: str) -> None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
206 if hasattr(self.wrapped, '__set_name__'): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
207 self.wrapped.__set_name__(instance, name) # pyright: ignore[reportFunctionMemberAccess] 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
209 def __getattr__(self, name: str, /) -> Any: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
210 """Forward checks for __isabstractmethod__ and such."""
211 return getattr(self.wrapped, name) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
214DecoratorInfoType = TypeVar('DecoratorInfoType', bound=DecoratorInfo) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
217@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
218class Decorator(Generic[DecoratorInfoType]): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
219 """A generic container class to join together the decorator metadata
220 (metadata from decorator itself, which we have when the
221 decorator is called but not when we are building the core-schema)
222 and the bound function (which we have after the class itself is created).
224 Attributes:
225 cls_ref: The class ref.
226 cls_var_name: The decorated function name.
227 func: The decorated function.
228 shim: A wrapper function to wrap V1 style function.
229 info: The decorator info.
230 """
232 cls_ref: str 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
233 cls_var_name: str 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
234 func: Callable[..., Any] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
235 shim: Callable[[Any], Any] | None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
236 info: DecoratorInfoType 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
238 @staticmethod 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
239 def build( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
240 cls_: Any,
241 *,
242 cls_var_name: str,
243 shim: Callable[[Any], Any] | None,
244 info: DecoratorInfoType,
245 ) -> Decorator[DecoratorInfoType]:
246 """Build a new decorator.
248 Args:
249 cls_: The class.
250 cls_var_name: The decorated function name.
251 shim: A wrapper function to wrap V1 style function.
252 info: The decorator info.
254 Returns:
255 The new decorator instance.
256 """
257 func = get_attribute_from_bases(cls_, cls_var_name) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
258 if shim is not None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
259 func = shim(func) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
260 func = unwrap_wrapped_function(func, unwrap_partial=False) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
261 if not callable(func): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
262 # This branch will get hit for classmethod properties
263 attribute = get_attribute_from_base_dicts(cls_, cls_var_name) # prevents the binding call to `__get__` 1stabcdefzyuvghijklwxmnopqr
264 if isinstance(attribute, PydanticDescriptorProxy): 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true1stabcdefzyuvghijklwxmnopqr
265 func = unwrap_wrapped_function(attribute.wrapped) 1stabcdefzyuvghijklwxmnopqr
266 return Decorator( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
267 cls_ref=get_type_ref(cls_),
268 cls_var_name=cls_var_name,
269 func=func,
270 shim=shim,
271 info=info,
272 )
274 def bind_to_cls(self, cls: Any) -> Decorator[DecoratorInfoType]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
275 """Bind the decorator to a class.
277 Args:
278 cls: the class.
280 Returns:
281 The new decorator instance.
282 """
283 return self.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
284 cls,
285 cls_var_name=self.cls_var_name,
286 shim=self.shim,
287 info=self.info,
288 )
291def get_bases(tp: type[Any]) -> tuple[type[Any], ...]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
292 """Get the base classes of a class or typeddict.
294 Args:
295 tp: The type or class to get the bases.
297 Returns:
298 The base classes.
299 """
300 if is_typeddict(tp): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
301 return tp.__orig_bases__ # type: ignore 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
302 try: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
303 return tp.__bases__ 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
304 except AttributeError: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
305 return () 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
308def mro(tp: type[Any]) -> tuple[type[Any], ...]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
309 """Calculate the Method Resolution Order of bases using the C3 algorithm.
311 See https://www.python.org/download/releases/2.3/mro/
312 """
313 # try to use the existing mro, for performance mainly
314 # but also because it helps verify the implementation below
315 if not is_typeddict(tp): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
316 try: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
317 return tp.__mro__ 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
318 except AttributeError: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
319 # GenericAlias and some other cases
320 pass 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
322 bases = get_bases(tp) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
323 return (tp,) + mro_for_bases(bases) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
326def mro_for_bases(bases: tuple[type[Any], ...]) -> tuple[type[Any], ...]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
327 def merge_seqs(seqs: list[deque[type[Any]]]) -> Iterable[type[Any]]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
328 while True: 1abcdefABCyghijklDEFJKLMNOmnopqrGHI
329 non_empty = [seq for seq in seqs if seq] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
330 if not non_empty: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
331 # Nothing left to process, we're done.
332 return 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
333 candidate: type[Any] | None = None 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
334 for seq in non_empty: # Find merge candidates among seq heads. 334 ↛ 342line 334 didn't jump to line 342 because the loop on line 334 didn't complete1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
335 candidate = seq[0] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
336 not_head = [s for s in non_empty if candidate in islice(s, 1, None)] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
337 if not_head: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
338 # Reject the candidate.
339 candidate = None 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
340 else:
341 break 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
342 if not candidate: 342 ↛ 343line 342 didn't jump to line 343 because the condition on line 342 was never true1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
343 raise TypeError('Inconsistent hierarchy, no C3 MRO is possible')
344 yield candidate 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
345 for seq in non_empty: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
346 # Remove candidate.
347 if seq[0] == candidate: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
348 seq.popleft() 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
350 seqs = [deque(mro(base)) for base in bases] + [deque(bases)] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
351 return tuple(merge_seqs(seqs)) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
354_sentinel = object() 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
357def get_attribute_from_bases(tp: type[Any] | tuple[type[Any], ...], name: str) -> Any: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
358 """Get the attribute from the next class in the MRO that has it,
359 aiming to simulate calling the method on the actual class.
361 The reason for iterating over the mro instead of just getting
362 the attribute (which would do that for us) is to support TypedDict,
363 which lacks a real __mro__, but can have a virtual one constructed
364 from its bases (as done here).
366 Args:
367 tp: The type or class to search for the attribute. If a tuple, this is treated as a set of base classes.
368 name: The name of the attribute to retrieve.
370 Returns:
371 Any: The attribute value, if found.
373 Raises:
374 AttributeError: If the attribute is not found in any class in the MRO.
375 """
376 if isinstance(tp, tuple): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
377 for base in mro_for_bases(tp): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
378 attribute = base.__dict__.get(name, _sentinel) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
379 if attribute is not _sentinel: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
380 attribute_get = getattr(attribute, '__get__', None) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
381 if attribute_get is not None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
382 return attribute_get(None, tp) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
383 return attribute 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
384 raise AttributeError(f'{name} not found in {tp}') 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
385 else:
386 try: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
387 return getattr(tp, name) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
388 except AttributeError: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
389 return get_attribute_from_bases(mro(tp), name) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
392def get_attribute_from_base_dicts(tp: type[Any], name: str) -> Any: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
393 """Get an attribute out of the `__dict__` following the MRO.
394 This prevents the call to `__get__` on the descriptor, and allows
395 us to get the original function for classmethod properties.
397 Args:
398 tp: The type or class to search for the attribute.
399 name: The name of the attribute to retrieve.
401 Returns:
402 Any: The attribute value, if found.
404 Raises:
405 KeyError: If the attribute is not found in any class's `__dict__` in the MRO.
406 """
407 for base in reversed(mro(tp)): 407 ↛ 410line 407 didn't jump to line 410 because the loop on line 407 didn't complete1stabcdefzyuvghijklwxmnopqr
408 if name in base.__dict__: 1stabcdefzyuvghijklwxmnopqr
409 return base.__dict__[name] 1stabcdefzyuvghijklwxmnopqr
410 return tp.__dict__[name] # raise the error
413@dataclass(**slots_true) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
414class DecoratorInfos: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
415 """Mapping of name in the class namespace to decorator info.
417 note that the name in the class namespace is the function or attribute name
418 not the field name!
419 """
421 validators: dict[str, Decorator[ValidatorDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
422 field_validators: dict[str, Decorator[FieldValidatorDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
423 root_validators: dict[str, Decorator[RootValidatorDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
424 field_serializers: dict[str, Decorator[FieldSerializerDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
425 model_serializers: dict[str, Decorator[ModelSerializerDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
426 model_validators: dict[str, Decorator[ModelValidatorDecoratorInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
427 computed_fields: dict[str, Decorator[ComputedFieldInfo]] = field(default_factory=dict) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
429 @staticmethod 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
430 def build(model_dc: type[Any]) -> DecoratorInfos: # noqa: C901 (ignore complexity) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
431 """We want to collect all DecFunc instances that exist as
432 attributes in the namespace of the class (a BaseModel or dataclass)
433 that called us
434 But we want to collect these in the order of the bases
435 So instead of getting them all from the leaf class (the class that called us),
436 we traverse the bases from root (the oldest ancestor class) to leaf
437 and collect all of the instances as we go, taking care to replace
438 any duplicate ones with the last one we see to mimic how function overriding
439 works with inheritance.
440 If we do replace any functions we put the replacement into the position
441 the replaced function was in; that is, we maintain the order.
442 """
443 # reminder: dicts are ordered and replacement does not alter the order
444 res = DecoratorInfos() 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
445 for base in reversed(mro(model_dc)[1:]): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
446 existing: DecoratorInfos | None = base.__dict__.get('__pydantic_decorators__') 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
447 if existing is None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
448 existing = DecoratorInfos.build(base) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
449 res.validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.validators.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
450 res.field_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_validators.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
451 res.root_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.root_validators.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
452 res.field_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.field_serializers.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
453 res.model_serializers.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_serializers.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
454 res.model_validators.update({k: v.bind_to_cls(model_dc) for k, v in existing.model_validators.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
455 res.computed_fields.update({k: v.bind_to_cls(model_dc) for k, v in existing.computed_fields.items()}) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
457 to_replace: list[tuple[str, Any]] = [] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
459 for var_name, var_value in vars(model_dc).items(): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
460 if isinstance(var_value, PydanticDescriptorProxy): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
461 info = var_value.decorator_info 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
462 if isinstance(info, ValidatorDecoratorInfo): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
463 res.validators[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
464 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
465 )
466 elif isinstance(info, FieldValidatorDecoratorInfo): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
467 res.field_validators[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
468 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
469 )
470 elif isinstance(info, RootValidatorDecoratorInfo): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
471 res.root_validators[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
472 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
473 )
474 elif isinstance(info, FieldSerializerDecoratorInfo): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
475 # check whether a serializer function is already registered for fields
476 for field_serializer_decorator in res.field_serializers.values(): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
477 # check that each field has at most one serializer function.
478 # serializer functions for the same field in subclasses are allowed,
479 # and are treated as overrides
480 if field_serializer_decorator.cls_var_name == var_name: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
481 continue 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
482 for f in info.fields: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
483 if f in field_serializer_decorator.info.fields: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
484 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
485 'Multiple field serializer functions were defined '
486 f'for field {f!r}, this is not allowed.',
487 code='multiple-field-serializers',
488 )
489 res.field_serializers[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
490 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
491 )
492 elif isinstance(info, ModelValidatorDecoratorInfo): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
493 res.model_validators[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
494 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
495 )
496 elif isinstance(info, ModelSerializerDecoratorInfo): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
497 res.model_serializers[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
498 model_dc, cls_var_name=var_name, shim=var_value.shim, info=info
499 )
500 else:
501 from ..fields import ComputedFieldInfo 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
503 isinstance(var_value, ComputedFieldInfo) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
504 res.computed_fields[var_name] = Decorator.build( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
505 model_dc, cls_var_name=var_name, shim=None, info=info
506 )
507 to_replace.append((var_name, var_value.wrapped)) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
508 if to_replace: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
509 # If we can save `__pydantic_decorators__` on the class we'll be able to check for it above
510 # so then we don't need to re-process the type, which means we can discard our descriptor wrappers
511 # and replace them with the thing they are wrapping (see the other setattr call below)
512 # which allows validator class methods to also function as regular class methods
513 model_dc.__pydantic_decorators__ = res 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
514 for name, value in to_replace: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
515 setattr(model_dc, name, value) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
516 return res 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
518 def update_from_config(self, config_wrapper: ConfigWrapper) -> None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
519 """Update the decorator infos from the configuration of the class they are attached to."""
520 for name, computed_field_dec in self.computed_fields.items(): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
521 computed_field_dec.info._update_from_config(config_wrapper, name) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
524def inspect_validator(validator: Callable[..., Any], mode: FieldValidatorModes) -> bool: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
525 """Look at a field or model validator function and determine whether it takes an info argument.
527 An error is raised if the function has an invalid signature.
529 Args:
530 validator: The validator function to inspect.
531 mode: The proposed validator mode.
533 Returns:
534 Whether the validator takes an info argument.
535 """
536 try: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
537 sig = signature(validator) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
538 except (ValueError, TypeError): 1stabcdefABCuvghijklDEFwxmnopqrGHI
539 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
540 # In this case, we assume no info argument is present:
541 return False 1stabcdefABCuvghijklDEFwxmnopqrGHI
542 n_positional = count_positional_required_params(sig) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
543 if mode == 'wrap': 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
544 if n_positional == 3: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
545 return True 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
546 elif n_positional == 2: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
547 return False 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
548 else:
549 assert mode in {'before', 'after', 'plain'}, f"invalid mode: {mode!r}, expected 'before', 'after' or 'plain" 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
550 if n_positional == 2: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
551 return True 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
552 elif n_positional == 1: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
553 return False 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
555 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
556 f'Unrecognized field_validator function signature for {validator} with `mode={mode}`:{sig}',
557 code='validator-signature',
558 )
561def inspect_field_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> tuple[bool, bool]: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
562 """Look at a field serializer function and determine if it is a field serializer,
563 and whether it takes an info argument.
565 An error is raised if the function has an invalid signature.
567 Args:
568 serializer: The serializer function to inspect.
569 mode: The serializer mode, either 'plain' or 'wrap'.
571 Returns:
572 Tuple of (is_field_serializer, info_arg).
573 """
574 try: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
575 sig = signature(serializer) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
576 except (ValueError, TypeError):
577 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
578 # In this case, we assume no info argument is present and this is not a method:
579 return (False, False)
581 first = next(iter(sig.parameters.values()), None) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
582 is_field_serializer = first is not None and first.name == 'self' 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
584 n_positional = count_positional_required_params(sig) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
585 if is_field_serializer: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
586 # -1 to correct for self parameter
587 info_arg = _serializer_info_arg(mode, n_positional - 1) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
588 else:
589 info_arg = _serializer_info_arg(mode, n_positional) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
591 if info_arg is None: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
592 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
593 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}',
594 code='field-serializer-signature',
595 )
597 return is_field_serializer, info_arg 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
600def inspect_annotated_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
601 """Look at a serializer function used via `Annotated` and determine whether it takes an info argument.
603 An error is raised if the function has an invalid signature.
605 Args:
606 serializer: The serializer function to check.
607 mode: The serializer mode, either 'plain' or 'wrap'.
609 Returns:
610 info_arg
611 """
612 try: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
613 sig = signature(serializer) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
614 except (ValueError, TypeError): 1stabcdefABCuvghijklDEFwxmnopqrGHI
615 # `inspect.signature` might not be able to infer a signature, e.g. with C objects.
616 # In this case, we assume no info argument is present:
617 return False 1stabcdefABCuvghijklDEFwxmnopqrGHI
618 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig)) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
619 if info_arg is None: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
620 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
621 f'Unrecognized field_serializer function signature for {serializer} with `mode={mode}`:{sig}',
622 code='field-serializer-signature',
623 )
624 else:
625 return info_arg 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
628def inspect_model_serializer(serializer: Callable[..., Any], mode: Literal['plain', 'wrap']) -> bool: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
629 """Look at a model serializer function and determine whether it takes an info argument.
631 An error is raised if the function has an invalid signature.
633 Args:
634 serializer: The serializer function to check.
635 mode: The serializer mode, either 'plain' or 'wrap'.
637 Returns:
638 `info_arg` - whether the function expects an info argument.
639 """
640 if isinstance(serializer, (staticmethod, classmethod)) or not is_instance_method_from_sig(serializer): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
641 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
642 '`@model_serializer` must be applied to instance methods', code='model-serializer-instance-method'
643 )
645 sig = signature(serializer) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
646 info_arg = _serializer_info_arg(mode, count_positional_required_params(sig)) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
647 if info_arg is None: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
648 raise PydanticUserError( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
649 f'Unrecognized model_serializer function signature for {serializer} with `mode={mode}`:{sig}',
650 code='model-serializer-signature',
651 )
652 else:
653 return info_arg 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
656def _serializer_info_arg(mode: Literal['plain', 'wrap'], n_positional: int) -> bool | None: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
657 if mode == 'plain': 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
658 if n_positional == 1: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
659 # (input_value: Any, /) -> Any
660 return False 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
661 elif n_positional == 2: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
662 # (model: Any, input_value: Any, /) -> Any
663 return True 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
664 else:
665 assert mode == 'wrap', f"invalid mode: {mode!r}, expected 'plain' or 'wrap'" 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
666 if n_positional == 2: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
667 # (input_value: Any, serializer: SerializerFunctionWrapHandler, /) -> Any
668 return False 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
669 elif n_positional == 3: 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
670 # (input_value: Any, serializer: SerializerFunctionWrapHandler, info: SerializationInfo, /) -> Any
671 return True 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
673 return None 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
676AnyDecoratorCallable: TypeAlias = ( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
677 'Union[classmethod[Any, Any, Any], staticmethod[Any, Any], partialmethod[Any], Callable[..., Any]]'
678)
681def is_instance_method_from_sig(function: AnyDecoratorCallable) -> bool: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
682 """Whether the function is an instance method.
684 It will consider a function as instance method if the first parameter of
685 function is `self`.
687 Args:
688 function: The function to check.
690 Returns:
691 `True` if the function is an instance method, `False` otherwise.
692 """
693 sig = signature(unwrap_wrapped_function(function)) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
694 first = next(iter(sig.parameters.values()), None) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
695 if first and first.name == 'self': 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
696 return True 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
697 return False 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
700def ensure_classmethod_based_on_signature(function: AnyDecoratorCallable) -> Any: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
701 """Apply the `@classmethod` decorator on the function.
703 Args:
704 function: The function to apply the decorator on.
706 Return:
707 The `@classmethod` decorator applied function.
708 """
709 if not isinstance( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
710 unwrap_wrapped_function(function, unwrap_class_static_method=False), classmethod
711 ) and _is_classmethod_from_sig(function):
712 return classmethod(function) # type: ignore[arg-type] 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
713 return function 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
716def _is_classmethod_from_sig(function: AnyDecoratorCallable) -> bool: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
717 sig = signature(unwrap_wrapped_function(function)) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
718 first = next(iter(sig.parameters.values()), None) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
719 if first and first.name == 'cls': 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
720 return True 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
721 return False 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
724def unwrap_wrapped_function( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
725 func: Any,
726 *,
727 unwrap_partial: bool = True,
728 unwrap_class_static_method: bool = True,
729) -> Any:
730 """Recursively unwraps a wrapped function until the underlying function is reached.
731 This handles property, functools.partial, functools.partialmethod, staticmethod, and classmethod.
733 Args:
734 func: The function to unwrap.
735 unwrap_partial: If True (default), unwrap partial and partialmethod decorators.
736 unwrap_class_static_method: If True (default), also unwrap classmethod and staticmethod
737 decorators. If False, only unwrap partial and partialmethod decorators.
739 Returns:
740 The underlying function of the wrapped function.
741 """
742 # Define the types we want to check against as a single tuple.
743 unwrap_types = ( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
744 (property, cached_property)
745 + ((partial, partialmethod) if unwrap_partial else ())
746 + ((staticmethod, classmethod) if unwrap_class_static_method else ())
747 )
749 while isinstance(func, unwrap_types): 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
750 if unwrap_class_static_method and isinstance(func, (classmethod, staticmethod)): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
751 func = func.__func__ 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
752 elif isinstance(func, (partial, partialmethod)): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
753 func = func.func 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
754 elif isinstance(func, property): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
755 func = func.fget # arbitrary choice, convenient for computed fields 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
756 else:
757 # Make coverage happy as it can only get here in the last possible case
758 assert isinstance(func, cached_property) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
759 func = func.func # type: ignore 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
761 return func 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
764_function_like = ( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
765 partial,
766 partialmethod,
767 types.FunctionType,
768 types.BuiltinFunctionType,
769 types.MethodType,
770 types.WrapperDescriptorType,
771 types.MethodWrapperType,
772 types.MemberDescriptorType,
773)
776def get_callable_return_type( 1stabcdefABCuvghijklDEFPJKLMNOwxmnopqrGHI
777 callable_obj: Any,
778 globalns: GlobalsNamespace | None = None,
779 localns: MappingNamespace | None = None,
780) -> Any | PydanticUndefinedType:
781 """Get the callable return type.
783 Args:
784 callable_obj: The callable to analyze.
785 globalns: The globals namespace to use during type annotation evaluation.
786 localns: The locals namespace to use during type annotation evaluation.
788 Returns:
789 The function return type.
790 """
791 if isinstance(callable_obj, type): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
792 # types are callables, and we assume the return type
793 # is the type itself (e.g. `int()` results in an instance of `int`).
794 return callable_obj 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
796 if not isinstance(callable_obj, _function_like): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
797 call_func = getattr(type(callable_obj), '__call__', None) # noqa: B004 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
798 if call_func is not None: 798 ↛ 801line 798 didn't jump to line 801 because the condition on line 798 was always true1stabcdefABCzyuvghijklDEFwxmnopqrGHI
799 callable_obj = call_func 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
801 hints = get_function_type_hints( 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
802 unwrap_wrapped_function(callable_obj),
803 include_keys={'return'},
804 globalns=globalns,
805 localns=localns,
806 )
807 return hints.get('return', PydanticUndefined) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
810def count_positional_required_params(sig: Signature) -> int: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
811 """Get the number of positional (required) arguments of a signature.
813 This function should only be used to inspect signatures of validation and serialization functions.
814 The first argument (the value being serialized or validated) is counted as a required argument
815 even if a default value exists.
817 Returns:
818 The number of positional arguments of a signature.
819 """
820 parameters = list(sig.parameters.values()) 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
821 return sum( 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
822 1
823 for param in parameters
824 if can_be_positional(param)
825 # First argument is the value being validated/serialized, and can have a default value
826 # (e.g. `float`, which has signature `(x=0, /)`). We assume other parameters (the info arg
827 # for instance) should be required, and thus without any default value.
828 and (param.default is Parameter.empty or param is parameters[0])
829 )
832def ensure_property(f: Any) -> Any: 1stabcdefABCzyuvghijklDEFPJKLMNOwxmnopqrGHI
833 """Ensure that a function is a `property` or `cached_property`, or is a valid descriptor.
835 Args:
836 f: The function to check.
838 Returns:
839 The function, or a `property` or `cached_property` instance wrapping the function.
840 """
841 if ismethoddescriptor(f) or isdatadescriptor(f): 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
842 return f 1stabcdefABCzyuvghijklDEFwxmnopqrGHI
843 else:
844 return property(f) 1stabcdefABCzyuvghijklDEFwxmnopqrGHI