Coverage for pydantic_graph/pydantic_graph/_utils.py: 93.85%

49 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-28 17:27 +0000

1from __future__ import annotations as _annotations 

2 

3import asyncio 

4import types 

5from functools import partial 

6from typing import Any, Callable, TypeVar 

7 

8from typing_extensions import ParamSpec, TypeIs, get_args, get_origin 

9from typing_inspection import typing_objects 

10from typing_inspection.introspection import is_union_origin 

11 

12 

13def get_event_loop(): 

14 try: 

15 event_loop = asyncio.get_event_loop() 

16 except RuntimeError: 

17 event_loop = asyncio.new_event_loop() 

18 asyncio.set_event_loop(event_loop) 

19 return event_loop 

20 

21 

22def get_union_args(tp: Any) -> tuple[Any, ...]: 

23 """Extract the arguments of a Union type if `response_type` is a union, otherwise return an empty tuple.""" 

24 # similar to `pydantic_ai_slim/pydantic_ai/_result.py:get_union_args` 

25 if typing_objects.is_typealiastype(tp): 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true

26 tp = tp.__value__ 

27 

28 origin = get_origin(tp) 

29 if is_union_origin(origin): 

30 return get_args(tp) 

31 else: 

32 return (tp,) 

33 

34 

35def unpack_annotated(tp: Any) -> tuple[Any, list[Any]]: 

36 """Strip `Annotated` from the type if present. 

37 

38 Returns: 

39 `(tp argument, ())` if not annotated, otherwise `(stripped type, annotations)`. 

40 """ 

41 origin = get_origin(tp) 

42 if typing_objects.is_annotated(origin): 

43 inner_tp, *args = get_args(tp) 

44 return inner_tp, args 

45 else: 

46 return tp, [] 

47 

48 

49def comma_and(items: list[str]) -> str: 

50 """Join with a comma and 'and' for the last item.""" 

51 if len(items) == 1: 

52 return items[0] 

53 else: 

54 # oxford comma ¯\_(ツ)_/¯ 

55 return ', '.join(items[:-1]) + ', and ' + items[-1] 

56 

57 

58def get_parent_namespace(frame: types.FrameType | None) -> dict[str, Any] | None: 

59 """Attempt to get the namespace where the graph was defined. 

60 

61 If the graph is defined with generics `Graph[a, b]` then another frame is inserted, and we have to skip that 

62 to get the correct namespace. 

63 """ 

64 if frame is not None: 64 ↛ exitline 64 didn't return from function 'get_parent_namespace' because the condition on line 64 was always true

65 if back := frame.f_back: 65 ↛ exitline 65 didn't return from function 'get_parent_namespace' because the condition on line 65 was always true

66 if back.f_globals.get('__name__') == 'typing': 

67 # If the class calling this function is generic, explicitly parameterizing the class 

68 # results in a `typing._GenericAlias` instance, which proxies instantiation calls to the 

69 # "real" class and thus adding an extra frame to the call. To avoid pulling anything 

70 # from the `typing` module, use the correct frame (the one before): 

71 return get_parent_namespace(back) 

72 else: 

73 return back.f_locals 

74 

75 

76class Unset: 

77 """A singleton to represent an unset value. 

78 

79 Copied from pydantic_ai/_utils.py. 

80 """ 

81 

82 pass 

83 

84 

85UNSET = Unset() 

86T = TypeVar('T') 

87 

88 

89def is_set(t_or_unset: T | Unset) -> TypeIs[T]: 

90 return t_or_unset is not UNSET 

91 

92 

93_P = ParamSpec('_P') 

94_R = TypeVar('_R') 

95 

96 

97async def run_in_executor(func: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs) -> _R: 

98 if kwargs: 

99 # noinspection PyTypeChecker 

100 return await asyncio.get_running_loop().run_in_executor(None, partial(func, *args, **kwargs)) 

101 else: 

102 return await asyncio.get_running_loop().run_in_executor(None, func, *args) # type: ignore