Coverage for asyncer/_compat.py: 100%
15 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 01:12 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 01:12 +0000
1# AnyIO 4.1.0 renamed cancellable to abandon_on_cancel
2import importlib 1abcde
3import importlib.metadata 1abcde
4from typing import Callable, TypeVar, Union 1abcde
6import anyio 1abcde
7import anyio.to_thread 1abcde
8from anyio import CapacityLimiter 1abcde
9from typing_extensions import TypeVarTuple, Unpack 1abcde
11ANYIO_VERSION = importlib.metadata.version("anyio") 1abcde
13T_Retval = TypeVar("T_Retval") 1abcde
14PosArgsT = TypeVarTuple("PosArgsT") 1abcde
16if ANYIO_VERSION >= "4.1.0": 1abcde
18 async def run_sync( 1ab
19 func: Callable[[Unpack[PosArgsT]], T_Retval],
20 *args: Unpack[PosArgsT],
21 abandon_on_cancel: bool = False,
22 limiter: Union[CapacityLimiter, None] = None,
23 ) -> T_Retval:
24 return await anyio.to_thread.run_sync( 1fghijklmnopq
25 func, *args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
26 )
27else:
29 async def run_sync( 1cde
30 func: Callable[[Unpack[PosArgsT]], T_Retval],
31 *args: Unpack[PosArgsT],
32 abandon_on_cancel: bool = False,
33 limiter: Union[CapacityLimiter, None] = None,
34 ) -> T_Retval:
35 return await anyio.to_thread.run_sync( 1rstuvwxyzABCDEFGHI
36 func, *args, cancellable=abandon_on_cancel, limiter=limiter
37 )