Coverage for asyncer/_compat.py: 100%

15 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-09 01:12 +0000

1# AnyIO 4.1.0 renamed cancellable to abandon_on_cancel 

2import importlib 1abcde

3import importlib.metadata 1abcde

4from typing import Callable, TypeVar, Union 1abcde

5 

6import anyio 1abcde

7import anyio.to_thread 1abcde

8from anyio import CapacityLimiter 1abcde

9from typing_extensions import TypeVarTuple, Unpack 1abcde

10 

11ANYIO_VERSION = importlib.metadata.version("anyio") 1abcde

12 

13T_Retval = TypeVar("T_Retval") 1abcde

14PosArgsT = TypeVarTuple("PosArgsT") 1abcde

15 

16if ANYIO_VERSION >= "4.1.0": 1abcde

17 

18 async def run_sync( 1ab

19 func: Callable[[Unpack[PosArgsT]], T_Retval], 

20 *args: Unpack[PosArgsT], 

21 abandon_on_cancel: bool = False, 

22 limiter: Union[CapacityLimiter, None] = None, 

23 ) -> T_Retval: 

24 return await anyio.to_thread.run_sync( 1fghijklmnopq

25 func, *args, abandon_on_cancel=abandon_on_cancel, limiter=limiter 

26 ) 

27else: 

28 

29 async def run_sync( 1cde

30 func: Callable[[Unpack[PosArgsT]], T_Retval], 

31 *args: Unpack[PosArgsT], 

32 abandon_on_cancel: bool = False, 

33 limiter: Union[CapacityLimiter, None] = None, 

34 ) -> T_Retval: 

35 return await anyio.to_thread.run_sync( 1rstuvwxyzABCDEFGHI

36 func, *args, cancellable=abandon_on_cancel, limiter=limiter 

37 )