Coverage for fastapi/concurrency.py: 100%
18 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-08-08 03:53 +0000
1from contextlib import asynccontextmanager as asynccontextmanager 1abcde
2from typing import AsyncGenerator, ContextManager, TypeVar 1abcde
4import anyio 1abcde
5from anyio import CapacityLimiter 1abcde
6from starlette.concurrency import iterate_in_threadpool as iterate_in_threadpool # noqa 1abcde
7from starlette.concurrency import run_in_threadpool as run_in_threadpool # noqa 1abcde
8from starlette.concurrency import ( # noqa 1abcde
9 run_until_first_complete as run_until_first_complete,
10)
12_T = TypeVar("_T") 1abcde
15@asynccontextmanager 1abcde
16async def contextmanager_in_threadpool( 1abcde
17 cm: ContextManager[_T],
18) -> AsyncGenerator[_T, None]:
19 # blocking __exit__ from running waiting on a free thread
20 # can create race conditions/deadlocks if the context manager itself
21 # has its own internal pool (e.g. a database connection pool)
22 # to avoid this we let __exit__ run without a capacity limit
23 # since we're creating a new limiter for each call, any non-zero limit
24 # works (1 is arbitrary)
25 exit_limiter = CapacityLimiter(1) 1abcde
26 try: 1abcde
27 yield await run_in_threadpool(cm.__enter__) 1abcde
28 except Exception as e: 1abcde
29 ok = bool( 1abcde
30 await anyio.to_thread.run_sync(
31 cm.__exit__, type(e), e, None, limiter=exit_limiter
32 )
33 )
34 if not ok: 1abcde
35 raise e 1abcde
36 else:
37 await anyio.to_thread.run_sync( 1abcde
38 cm.__exit__, None, None, None, limiter=exit_limiter
39 )