Coverage for fastapi/concurrency.py: 100%

18 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-05 00:03 +0000

1from contextlib import asynccontextmanager as asynccontextmanager 1abcdef

2from typing import AsyncGenerator, ContextManager, TypeVar 1abcdef

3 

4import anyio.to_thread 1abcdef

5from anyio import CapacityLimiter 1abcdef

6from starlette.concurrency import iterate_in_threadpool as iterate_in_threadpool # noqa 1abcdef

7from starlette.concurrency import run_in_threadpool as run_in_threadpool # noqa 1abcdef

8from starlette.concurrency import ( # noqa 1abcdef

9 run_until_first_complete as run_until_first_complete, 

10) 

11 

12_T = TypeVar("_T") 1abcdef

13 

14 

15@asynccontextmanager 1abcdef

16async def contextmanager_in_threadpool( 1abcdef

17 cm: ContextManager[_T], 

18) -> AsyncGenerator[_T, None]: 

19 # blocking __exit__ from running waiting on a free thread 

20 # can create race conditions/deadlocks if the context manager itself 

21 # has its own internal pool (e.g. a database connection pool) 

22 # to avoid this we let __exit__ run without a capacity limit 

23 # since we're creating a new limiter for each call, any non-zero limit 

24 # works (1 is arbitrary) 

25 exit_limiter = CapacityLimiter(1) 2tbs t u ubv w x vby wbz obbbxbA cbybB C D g h zbE F G AbH I J BbK CbL dbebDbM fbEbN O P i j FbQ R S GbT U V HbW IbX pbgbJbY hbKbZ 0 1 k l Lb2 3 4 Mb5 6 7 Nb8 Ob9 qbibPb! jbQb# $ % m n Rb' ( ) Sb* + , Tb- Ub. rbkbVb/ lbWb: ; = o p Xb? @ [ Yb] ^ _ Zb` 0b{ sbmb1b| nb2b} ~ abq r

26 try: 2tbs t u ubv w x vby wbz obbbxbA cbybB C D g h zbE F G AbH I J BbK CbL dbebDbM fbEbN O P i j FbQ R S GbT U V HbW IbX pbgbJbY hbKbZ 0 1 k l Lb2 3 4 Mb5 6 7 Nb8 Ob9 qbibPb! jbQb# $ % m n Rb' ( ) Sb* + , Tb- Ub. rbkbVb/ lbWb: ; = o p Xb? @ [ Yb] ^ _ Zb` 0b{ sbmb1b| nb2b} ~ abq r

27 yield await run_in_threadpool(cm.__enter__) 2tbs t u ubv w x vby wbz obbbxbA cbybB C D g h zbE F G AbH I J BbK CbL dbebDbM fbEbN O P i j FbQ R S GbT U V HbW IbX pbgbJbY hbKbZ 0 1 k l Lb2 3 4 Mb5 6 7 Nb8 Ob9 qbibPb! jbQb# $ % m n Rb' ( ) Sb* + , Tb- Ub. rbkbVb/ lbWb: ; = o p Xb? @ [ Yb] ^ _ Zb` 0b{ sbmb1b| nb2b} ~ abq r

28 except Exception as e: 2s t u v w x y z obbbA cbB C D g h E F G H I J K L dbebM fbN O P i j Q R S T U V W X pbgbY hbZ 0 1 k l 2 3 4 5 6 7 8 9 qbib! jb# $ % m n ' ( ) * + , - . rbkb/ lb: ; = o p ? @ [ ] ^ _ ` { sbmb| nb} ~ abq r

29 ok = bool( 2s t u v w x y z obbbA cbB C D g h E F G H I J K L dbebM fbN O P i j Q R S T U V W X pbgbY hbZ 0 1 k l 2 3 4 5 6 7 8 9 qbib! jb# $ % m n ' ( ) * + , - . rbkb/ lb: ; = o p ? @ [ ] ^ _ ` { sbmb| nb} ~ abq r

30 await anyio.to_thread.run_sync( 

31 cm.__exit__, type(e), e, e.__traceback__, limiter=exit_limiter 

32 ) 

33 ) 

34 if not ok: 2s t u v w x y z bbA cbB C D g h E F G H I J K L ebM fbN O P i j Q R S T U V W X gbY hbZ 0 1 k l 2 3 4 5 6 7 8 9 ib! jb# $ % m n ' ( ) * + , - . kb/ lb: ; = o p ? @ [ ] ^ _ ` { mb| nb} ~ abq r

35 raise e 2s t u v w x y z A B C D g h E F G H I J K L dbM N O P i j Q R S T U V W X Y Z 0 1 k l 2 3 4 5 6 7 8 9 ! # $ % m n ' ( ) * + , - . / : ; = o p ? @ [ ] ^ _ ` { | } ~ abq r

36 else: 

37 await anyio.to_thread.run_sync( 2tbubvbwbxbybg h zbAbBbCbDbEbi j FbGbHbIbJbKbk l LbMbNbObPbQbm n RbSbTbUbVbWbo p XbYbZb0b1b2bq r

38 cm.__exit__, None, None, None, limiter=exit_limiter 

39 )