Coverage for faststream / _internal / utils / functions.py: 86%
31 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from collections.abc import AsyncIterator, Awaitable, Callable
3from concurrent.futures import Executor
4from contextlib import asynccontextmanager
5from functools import partial, wraps
6from typing import (
7 TYPE_CHECKING,
8 Any,
9 Optional,
10 TypeVar,
11 cast,
12 overload,
13)
15from fast_depends.utils import (
16 is_coroutine_callable,
17 run_async as call_or_await,
18 run_in_threadpool,
19)
20from typing_extensions import ParamSpec, Self
22from faststream._internal.basic_types import F_Return, F_Spec
24if TYPE_CHECKING:
25 from types import TracebackType
27__all__ = (
28 "call_or_await",
29 "fake_context",
30 "to_async",
31)
33P = ParamSpec("P")
34T = TypeVar("T")
37@overload
38def to_async(
39 func: Callable[F_Spec, Awaitable[F_Return]],
40) -> Callable[F_Spec, Awaitable[F_Return]]: ...
43@overload
44def to_async(
45 func: Callable[F_Spec, F_Return],
46) -> Callable[F_Spec, Awaitable[F_Return]]: ...
49def to_async(
50 func: Callable[F_Spec, F_Return] | Callable[F_Spec, Awaitable[F_Return]],
51) -> Callable[F_Spec, Awaitable[F_Return]]:
52 """Converts a synchronous function to an asynchronous function."""
53 if is_coroutine_callable(func):
54 return cast("Callable[F_Spec, Awaitable[F_Return]]", func)
56 func = cast("Callable[F_Spec, F_Return]", func)
58 @wraps(func)
59 async def to_async_wrapper(*args: F_Spec.args, **kwargs: F_Spec.kwargs) -> F_Return:
60 """Wraps a function to make it asynchronous."""
61 return await run_in_threadpool(func, *args, **kwargs)
63 return to_async_wrapper
66@asynccontextmanager
67async def fake_context(*args: Any, **kwargs: Any) -> AsyncIterator[None]:
68 yield None
71class FakeContext:
72 def __init__(self, *args: Any, **kwargs: Any) -> None:
73 pass
75 def __enter__(self) -> Self:
76 return self
78 def __exit__(
79 self,
80 exc_type: type[BaseException] | None = None,
81 exc_val: BaseException | None = None,
82 exc_tb: Optional["TracebackType"] = None,
83 ) -> None:
84 if exc_val:
85 raise exc_val
87 async def __aenter__(self) -> Self:
88 return self
90 async def __aexit__(
91 self,
92 exc_type: type[BaseException] | None = None,
93 exc_val: BaseException | None = None,
94 exc_tb: Optional["TracebackType"] = None,
95 ) -> None:
96 if exc_val:
97 raise exc_val
100async def return_input(x: Any) -> Any:
101 return x
104async def run_in_executor(
105 executor: Executor | None,
106 func: Callable[P, T],
107 *args: P.args,
108 **kwargs: P.kwargs,
109) -> T:
110 loop = asyncio.get_running_loop()
111 return await loop.run_in_executor(executor, partial(func, *args, **kwargs))