Coverage for faststream / _internal / utils / functions.py: 86%

31 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from collections.abc import AsyncIterator, Awaitable, Callable 

3from concurrent.futures import Executor 

4from contextlib import asynccontextmanager 

5from functools import partial, wraps 

6from typing import ( 

7 TYPE_CHECKING, 

8 Any, 

9 Optional, 

10 TypeVar, 

11 cast, 

12 overload, 

13) 

14 

15from fast_depends.utils import ( 

16 is_coroutine_callable, 

17 run_async as call_or_await, 

18 run_in_threadpool, 

19) 

20from typing_extensions import ParamSpec, Self 

21 

22from faststream._internal.basic_types import F_Return, F_Spec 

23 

24if TYPE_CHECKING: 

25 from types import TracebackType 

26 

27__all__ = ( 

28 "call_or_await", 

29 "fake_context", 

30 "to_async", 

31) 

32 

33P = ParamSpec("P") 

34T = TypeVar("T") 

35 

36 

37@overload 

38def to_async( 

39 func: Callable[F_Spec, Awaitable[F_Return]], 

40) -> Callable[F_Spec, Awaitable[F_Return]]: ... 

41 

42 

43@overload 

44def to_async( 

45 func: Callable[F_Spec, F_Return], 

46) -> Callable[F_Spec, Awaitable[F_Return]]: ... 

47 

48 

49def to_async( 

50 func: Callable[F_Spec, F_Return] | Callable[F_Spec, Awaitable[F_Return]], 

51) -> Callable[F_Spec, Awaitable[F_Return]]: 

52 """Converts a synchronous function to an asynchronous function.""" 

53 if is_coroutine_callable(func): 

54 return cast("Callable[F_Spec, Awaitable[F_Return]]", func) 

55 

56 func = cast("Callable[F_Spec, F_Return]", func) 

57 

58 @wraps(func) 

59 async def to_async_wrapper(*args: F_Spec.args, **kwargs: F_Spec.kwargs) -> F_Return: 

60 """Wraps a function to make it asynchronous.""" 

61 return await run_in_threadpool(func, *args, **kwargs) 

62 

63 return to_async_wrapper 

64 

65 

66@asynccontextmanager 

67async def fake_context(*args: Any, **kwargs: Any) -> AsyncIterator[None]: 

68 yield None 

69 

70 

71class FakeContext: 

72 def __init__(self, *args: Any, **kwargs: Any) -> None: 

73 pass 

74 

75 def __enter__(self) -> Self: 

76 return self 

77 

78 def __exit__( 

79 self, 

80 exc_type: type[BaseException] | None = None, 

81 exc_val: BaseException | None = None, 

82 exc_tb: Optional["TracebackType"] = None, 

83 ) -> None: 

84 if exc_val: 

85 raise exc_val 

86 

87 async def __aenter__(self) -> Self: 

88 return self 

89 

90 async def __aexit__( 

91 self, 

92 exc_type: type[BaseException] | None = None, 

93 exc_val: BaseException | None = None, 

94 exc_tb: Optional["TracebackType"] = None, 

95 ) -> None: 

96 if exc_val: 

97 raise exc_val 

98 

99 

100async def return_input(x: Any) -> Any: 

101 return x 

102 

103 

104async def run_in_executor( 

105 executor: Executor | None, 

106 func: Callable[P, T], 

107 *args: P.args, 

108 **kwargs: P.kwargs, 

109) -> T: 

110 loop = asyncio.get_running_loop() 

111 return await loop.run_in_executor(executor, partial(func, *args, **kwargs))