Coverage for fastapi/concurrency.py: 100%

18 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-08-08 03:53 +0000

1from contextlib import asynccontextmanager as asynccontextmanager 1abcde

2from typing import AsyncGenerator, ContextManager, TypeVar 1abcde

3 

4import anyio 1abcde

5from anyio import CapacityLimiter 1abcde

6from starlette.concurrency import iterate_in_threadpool as iterate_in_threadpool # noqa 1abcde

7from starlette.concurrency import run_in_threadpool as run_in_threadpool # noqa 1abcde

8from starlette.concurrency import ( # noqa 1abcde

9 run_until_first_complete as run_until_first_complete, 

10) 

11 

12_T = TypeVar("_T") 1abcde

13 

14 

15@asynccontextmanager 1abcde

16async def contextmanager_in_threadpool( 1abcde

17 cm: ContextManager[_T], 

18) -> AsyncGenerator[_T, None]: 

19 # blocking __exit__ from running waiting on a free thread 

20 # can create race conditions/deadlocks if the context manager itself 

21 # has its own internal pool (e.g. a database connection pool) 

22 # to avoid this we let __exit__ run without a capacity limit 

23 # since we're creating a new limiter for each call, any non-zero limit 

24 # works (1 is arbitrary) 

25 exit_limiter = CapacityLimiter(1) 1abcde

26 try: 1abcde

27 yield await run_in_threadpool(cm.__enter__) 1abcde

28 except Exception as e: 1abcde

29 ok = bool( 1abcde

30 await anyio.to_thread.run_sync( 

31 cm.__exit__, type(e), e, None, limiter=exit_limiter 

32 ) 

33 ) 

34 if not ok: 1abcde

35 raise e 1abcde

36 else: 

37 await anyio.to_thread.run_sync( 1abcde

38 cm.__exit__, None, None, None, limiter=exit_limiter 

39 )