Coverage for pydantic_graph/pydantic_graph/_utils.py: 93.85%
49 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-28 17:27 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-28 17:27 +0000
1from __future__ import annotations as _annotations
3import asyncio
4import types
5from functools import partial
6from typing import Any, Callable, TypeVar
8from typing_extensions import ParamSpec, TypeIs, get_args, get_origin
9from typing_inspection import typing_objects
10from typing_inspection.introspection import is_union_origin
13def get_event_loop():
14 try:
15 event_loop = asyncio.get_event_loop()
16 except RuntimeError:
17 event_loop = asyncio.new_event_loop()
18 asyncio.set_event_loop(event_loop)
19 return event_loop
22def get_union_args(tp: Any) -> tuple[Any, ...]:
23 """Extract the arguments of a Union type if `response_type` is a union, otherwise return an empty tuple."""
24 # similar to `pydantic_ai_slim/pydantic_ai/_result.py:get_union_args`
25 if typing_objects.is_typealiastype(tp): 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true
26 tp = tp.__value__
28 origin = get_origin(tp)
29 if is_union_origin(origin):
30 return get_args(tp)
31 else:
32 return (tp,)
35def unpack_annotated(tp: Any) -> tuple[Any, list[Any]]:
36 """Strip `Annotated` from the type if present.
38 Returns:
39 `(tp argument, ())` if not annotated, otherwise `(stripped type, annotations)`.
40 """
41 origin = get_origin(tp)
42 if typing_objects.is_annotated(origin):
43 inner_tp, *args = get_args(tp)
44 return inner_tp, args
45 else:
46 return tp, []
49def comma_and(items: list[str]) -> str:
50 """Join with a comma and 'and' for the last item."""
51 if len(items) == 1:
52 return items[0]
53 else:
54 # oxford comma ¯\_(ツ)_/¯
55 return ', '.join(items[:-1]) + ', and ' + items[-1]
58def get_parent_namespace(frame: types.FrameType | None) -> dict[str, Any] | None:
59 """Attempt to get the namespace where the graph was defined.
61 If the graph is defined with generics `Graph[a, b]` then another frame is inserted, and we have to skip that
62 to get the correct namespace.
63 """
64 if frame is not None: 64 ↛ exitline 64 didn't return from function 'get_parent_namespace' because the condition on line 64 was always true
65 if back := frame.f_back: 65 ↛ exitline 65 didn't return from function 'get_parent_namespace' because the condition on line 65 was always true
66 if back.f_globals.get('__name__') == 'typing':
67 # If the class calling this function is generic, explicitly parameterizing the class
68 # results in a `typing._GenericAlias` instance, which proxies instantiation calls to the
69 # "real" class and thus adding an extra frame to the call. To avoid pulling anything
70 # from the `typing` module, use the correct frame (the one before):
71 return get_parent_namespace(back)
72 else:
73 return back.f_locals
76class Unset:
77 """A singleton to represent an unset value.
79 Copied from pydantic_ai/_utils.py.
80 """
82 pass
85UNSET = Unset()
86T = TypeVar('T')
89def is_set(t_or_unset: T | Unset) -> TypeIs[T]:
90 return t_or_unset is not UNSET
93_P = ParamSpec('_P')
94_R = TypeVar('_R')
97async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R:
98 if kwargs:
99 # noinspection PyTypeChecker
100 return await asyncio.get_running_loop().run_in_executor(None, partial(func, *args, **kwargs))
101 else:
102 return await asyncio.get_running_loop().run_in_executor(None, func, *args) # type: ignore