Coverage for asyncer / _main.py: 100%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-05-05 09:41 +0000

1import functools 1abcdefg

2import sys 1abcdefg

3from collections.abc import Awaitable, Callable, Coroutine 1abcdefg

4from importlib import import_module 1abcdefg

5from typing import ( 1abcdefg

6 Any, 

7 Generic, 

8 ParamSpec, 

9 TypeVar, 

10) 

11from warnings import warn 1abcdefg

12 

13import anyio 1abcdefg

14import anyio.from_thread 1abcdefg

15import anyio.to_thread 1abcdefg

16import sniffio 1abcdefg

17from anyio._core._eventloop import threadlocals 1abcdefg

18from anyio.abc import TaskGroup as _TaskGroup 1abcdefg

19from asyncer._compat import run_sync 1abcdefg

20 

21 

22# This was obtained with: from anyio._core._eventloop import get_asynclib 

23# Removed in https://github.com/agronholm/anyio/pull/429 

24# Released in AnyIO 4.x.x 

25# The new function is anyio._core._eventloop.get_async_backend but that returns a 

26# class, not a module to extract the TaskGroup class from. 

27def get_asynclib(asynclib_name: str | None = None) -> Any: 1abcdefg

28 if asynclib_name is None: 1CovhDpwiEqxjFrykGszlHtAmIuBn

29 asynclib_name = sniffio.current_async_library() 1CovhDpwiEqxjFrykGszlHtAmIuBn

30 

31 modulename = "anyio._backends._" + asynclib_name 1CovhDpwiEqxjFrykGszlHtAmIuBn

32 try: 1CovhDpwiEqxjFrykGszlHtAmIuBn

33 return sys.modules[modulename] 1CovhDpwiEqxjFrykGszlHtAmIuBn

34 except KeyError: # pragma: no cover 

35 return import_module(modulename) 

36 

37 

38T_Retval = TypeVar("T_Retval") 1abcdefg

39T_ParamSpec = ParamSpec("T_ParamSpec") 1abcdefg

40T = TypeVar("T") 1abcdefg

41 

42 

43class PendingType: 1abcdefg

44 def __repr__(self) -> str: 1abcdefg

45 return "AsyncerPending" 2` { | } ~ abbb

46 

47 

48Pending = PendingType() 1abcdefg

49 

50 

51class PendingValueException(Exception): 1abcdefg

52 pass 1abcdefg

53 

54 

55class SoonValue(Generic[T]): 1abcdefg

56 def __init__(self) -> None: 1abcdefg

57 self._stored_value: T | PendingType = Pending 1CovhDpwiEqxjFrykGszlHtAmIuBn

58 

59 @property 1abcdefg

60 def value(self) -> T: 1abcdefg

61 if isinstance(self._stored_value, PendingType): 1ovhpwiqxjrykszltAmuBn

62 raise PendingValueException( 1vwxyzAB

63 "The return value of this task is still pending. Maybe you forgot to " 

64 "access it after the async with asyncer.create_task_group() block. " 

65 "If you need to access values of async tasks inside the same task " 

66 "group, you probably need a different approach, for example with " 

67 "AnyIO Streams." 

68 ) 

69 return self._stored_value 1ohpiqjrksltmun

70 

71 @property 1abcdefg

72 def ready(self) -> bool: 1abcdefg

73 return not isinstance(self._stored_value, PendingType) 1hijklmn

74 

75 

76class TaskGroup(_TaskGroup): 1abcdefg

77 def soonify( 1abcdefg

78 self, async_function: Callable[T_ParamSpec, Awaitable[T]], name: object = None 

79 ) -> Callable[T_ParamSpec, SoonValue[T]]: 

80 """ 

81 Create and return a function that when called will start a new task in this 

82 task group. 

83 

84 Internally it uses the same `task_group.start_soon()` method. But 

85 `task_group.soonify()` supports keyword arguments additional to positional 

86 arguments and it adds better support for autocompletion and inline errors 

87 for the arguments of the function called. 

88 

89 Use it like this: 

90 

91 ```Python 

92 async with asyncer.create_task_group() as task_group: 

93 async def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: 

94 # Do work 

95 return "Some result value" 

96 

97 result = task_group.soonify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

98 

99 print(result.value) 

100 ``` 

101 

102 The return value from that function (`result` in the example) is an object 

103 `SoonValue`. 

104 

105 This `SoonValue` object has an attribute `soon_value.value` that will hold the 

106 return value of the original `async_function` *after* the `async with` block. 

107 

108 If you try to access the `soon_value.value` inside the `async with` block, 

109 before it has the actual return value, it will raise a an exception 

110 `asyncer.PendingValueException`. 

111 

112 If you think you need to access the return values inside the `async with` block, 

113 there's a high chance that you really need a different approach, for example 

114 using an AnyIO Stream. 

115 

116 But either way, if you have checkpoints inside the `async with` block (you have 

117 some `await` there), one or more of the `SoonValue` objects you might have 

118 could end up having the result value ready before ending the `async with` block. 

119 You can check that with `soon_value.ready`. For example: 

120 

121 ```Python 

122 async def do_work(name: str) -> str: 

123 return f"Hello {name}" 

124 

125 async with asyncer.create_task_group() as task_group: 

126 result1 = task_group.soonify(do_work)(name="task 1") 

127 result2 = task_group.soonify(do_work)(name="task 2") 

128 await anyio.sleep(0) 

129 if result1.ready: 

130 print(result1.value) 

131 if result2.ready: 

132 print(result2.value) 

133 ``` 

134 

135 

136 ## Arguments 

137 

138 `async_function`: an async function to call soon 

139 `name`: name of the task, for the purposes of introspection and debugging 

140 

141 ## Return 

142 

143 A function that takes positional and keyword arguments and when called 

144 uses `task_group.start_soon()` to start the task in this task group. 

145 

146 That function returns a `SoonValue` object holding the return value of the 

147 original function in `soon_value.value`. 

148 """ 

149 

150 @functools.wraps(async_function) 1CovhDpwiEqxjFrykGszlHtAmIuBn

151 def wrapper( 1CovhDpwiEqxjFrykGszlHtAmIuBn

152 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs 

153 ) -> SoonValue[T]: 

154 partial_f = functools.partial(async_function, *args, **kwargs) 1CovhDpwiEqxjFrykGszlHtAmIuBn

155 soon_value: SoonValue[T] = SoonValue() 1CovhDpwiEqxjFrykGszlHtAmIuBn

156 

157 @functools.wraps(partial_f) 1CovhDpwiEqxjFrykGszlHtAmIuBn

158 async def value_wrapper(*args: Any) -> None: 1CovhDpwiEqxjFrykGszlHtAmIuBn

159 value = await partial_f() 1CovhDpwiEqxjFrykGszlHtAmIuBn

160 soon_value._stored_value = value 1CohDpiEqjFrkGslHtmIun

161 

162 self.start_soon(value_wrapper, name=name) 1CovhDpwiEqxjFrykGszlHtAmIuBn

163 return soon_value 1CovhDpwiEqxjFrykGszlHtAmIuBn

164 

165 return wrapper 1CovhDpwiEqxjFrykGszlHtAmIuBn

166 

167 # This is only for the return type annotation, but it won't really be called 

168 async def __aenter__(self) -> "TaskGroup": # pragma: nocover 1abcdefg

169 """Enter the task group context and allow starting new tasks.""" 

170 return await super().__aenter__() # type: ignore 

171 

172 

173def create_task_group() -> "TaskGroup": 1abcdefg

174 """ 

175 Create a task group used to start multiple concurrent tasks with async functions. 

176 

177 `asyncer.create_task_group()` is different from `anyio.create_task_group()` in that 

178 it creates an extended `TaskGroup` object that includes the `task_group.soonify()` 

179 method. 

180 """ 

181 

182 LibTaskGroup = get_asynclib().TaskGroup 1CovhDpwiEqxjFrykGszlHtAmIuBn

183 

184 class ExtendedTaskGroup(LibTaskGroup, TaskGroup): # type: ignore[valid-type, misc] 1CovhDpwiEqxjFrykGszlHtAmIuBn

185 pass 1CovhDpwiEqxjFrykGszlHtAmIuBn

186 

187 return ExtendedTaskGroup() 1CovhDpwiEqxjFrykGszlHtAmIuBn

188 

189 

190def runnify( 1abcdefg

191 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]], 

192 backend: str = "asyncio", 

193 backend_options: dict[str, Any] | None = None, 

194) -> Callable[T_ParamSpec, T_Retval]: 

195 """ 

196 Take an async function and create a regular (blocking) function that receives the 

197 same keyword and positional arguments for the original async function, and that when 

198 called will create an event loop and use it to run the original `async_function` 

199 with those arguments. 

200 

201 That function returns the return value from the original `async_function`. 

202 

203 The current thread must not be already running an event loop. 

204 

205 This calls `anyio.run()` underneath. 

206 

207 Use it like this: 

208 

209 ```Python 

210 async def program(name: str) -> str: 

211 return f"Hello {name}" 

212 

213 

214 result = asyncer.runnify(program)(name="World") 

215 print(result) 

216 ``` 

217 

218 ## Arguments 

219 

220 `async_function`: an async function to call 

221 `backend`: name of the asynchronous event loop implementation - currently either 

222 `asyncio` or `trio` 

223 `backend_options` keyword arguments to call the backend `run()` implementation with 

224 

225 ## Return 

226 

227 The return value of the async function 

228 

229 ## Raises 

230 

231 `RuntimeError`: if an asynchronous event loop is already running in this thread 

232 `LookupError`: if the named backend is not found 

233 """ 

234 

235 @functools.wraps(async_function) 1=?@[]^_

236 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1=?@[]^_

237 partial_f = functools.partial(async_function, *args, **kwargs) 1=?@[]^_

238 

239 return anyio.run(partial_f, backend=backend, backend_options=backend_options) 1=?@[]^_

240 

241 return wrapper 1=?@[]^_

242 

243 

244def syncify( 1abcdefg

245 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]], 

246 raise_sync_error: bool = True, 

247) -> Callable[T_ParamSpec, T_Retval]: 

248 """ 

249 Take an async function and create a regular one that receives the same keyword and 

250 positional arguments, and that when called, calls the original async function in 

251 the main async loop from the worker thread using `anyio.to_thread.run()`. 

252 

253 By default this is expected to be used from a worker thread. For example inside 

254 some function passed to `asyncify()`. 

255 

256 But if you set `raise_sync_error` to `False`, you can also use this function 

257 in a non-async context: without an async event loop. For example, from a 

258 blocking/regular function called at the top level of a Python file. In that case, 

259 if it is not being called from inside a worker thread started from an async context 

260 (e.g. this is not called from a function that was called with `asyncify()`) it will 

261 run `async_function` in a new async event loop with `anyio.run()`. 

262 

263 This functionality with `raise_sync_error` is there only to allow using 

264 `syncify()` in codebases that are used by async code in some cases and by blocking 

265 code in others. For example, during migrations from blocking code to async code. 

266 

267 Internally, `asyncer.syncify()` uses the same `anyio.from_thread.run()`, but it 

268 supports keyword arguments additional to positional arguments and it adds better 

269 support for tooling (e.g. editor autocompletion and inline errors) for the 

270 arguments and return value of the function. 

271 

272 Use it like this: 

273 

274 ```Python 

275 async def do_work(arg1, arg2, kwarg1="", kwarg2=""): 

276 # Do work 

277 

278 result = from_thread.syncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

279 ``` 

280 

281 ## Arguments 

282 

283 `async_function`: an async function to be called in the main thread, in the async 

284 event loop 

285 `raise_sync_error`: If set to `False`, when used in a non-async context (without 

286 an async event loop), it will run `async_function` in a new async event loop, 

287 instead of raising an exception. 

288 

289 ## Return 

290 

291 A regular blocking function that takes the same positional and keyword arguments 

292 as the original async one, that when called runs the same original function in 

293 the main async loop when called from a worker thread and returns the result. 

294 """ 

295 

296 @functools.wraps(async_function) 1JXKLYMNZOP0QR1ST2UV3W

297 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1JXKLYMNZOP0QR1ST2UV3W

298 current_async_module = ( 1JXKLYMNZOP0QR1ST2UV3W

299 getattr(threadlocals, "current_token", None) 

300 or 

301 # TODO: remove when deprecating AnyIO 4.10.0 

302 getattr(threadlocals, "current_async_backend", None) 

303 or 

304 # TODO: remove when deprecating AnyIO 3.x 

305 getattr(threadlocals, "current_async_module", None) 

306 ) 

307 partial_f = functools.partial(async_function, *args, **kwargs) 1JXKLYMNZOP0QR1ST2UV3W

308 if current_async_module is None and raise_sync_error is False: 1JXKLYMNZOP0QR1ST2UV3W

309 return anyio.run(partial_f) 1JKLMNOPQRSTUVW

310 return anyio.from_thread.run(partial_f) 1JXKLYMNZOP0QR1ST2UV3W

311 

312 return wrapper 1JXKLYMNZOP0QR1ST2UV3W

313 

314 

315def asyncify( 1abcdefg

316 function: Callable[T_ParamSpec, T_Retval], 

317 *, 

318 abandon_on_cancel: bool = False, 

319 cancellable: bool | None = None, 

320 limiter: anyio.CapacityLimiter | None = None, 

321) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: 

322 """ 

323 Take a blocking function and create an async one that receives the same 

324 positional and keyword arguments, and that when called, calls the original function 

325 in a worker thread using `anyio.to_thread.run_sync()`. Internally, 

326 `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports 

327 keyword arguments additional to positional arguments and it adds better support for 

328 autocompletion and inline errors for the arguments of the function called and the 

329 return value. 

330 

331 If the `cancellable` option is enabled and the task waiting for its completion is 

332 cancelled, the thread will still run its course but its return value (or any raised 

333 exception) will be ignored. 

334 

335 Use it like this: 

336 

337 ```Python 

338 def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: 

339 # Do work 

340 return "Some result" 

341 

342 result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

343 print(result) 

344 ``` 

345 

346 ## Arguments 

347 

348 `function`: a blocking regular callable (e.g. a function) 

349 `cancellable`: `True` to allow cancellation of the operation 

350 `limiter`: capacity limiter to use to limit the total amount of threads running 

351 (if omitted, the default limiter is used) 

352 

353 ## Return 

354 

355 An async function that takes the same positional and keyword arguments as the 

356 original one, that when called runs the same original function in a thread worker 

357 and returns the result. 

358 """ 

359 if cancellable is not None: 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W

360 abandon_on_cancel = cancellable 1456789!

361 warn( 1456789!

362 "The `cancellable=` keyword argument to `asyncer.asyncify()` is " 

363 "deprecated since Asyncer 0.0.8, following AnyIO 4.1.0. " 

364 "Use `abandon_on_cancel=` instead.", 

365 DeprecationWarning, 

366 stacklevel=2, 

367 ) 

368 

369 @functools.wraps(function) 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W

370 async def wrapper( 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W

371 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs 

372 ) -> T_Retval: 

373 partial_f = functools.partial(function, *args, **kwargs) 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W

374 

375 return await run_sync( 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W

376 partial_f, abandon_on_cancel=abandon_on_cancel, limiter=limiter 

377 ) 

378 

379 return wrapper 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W