Coverage for asyncer / _main.py: 100%
81 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-05 09:41 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-05-05 09:41 +0000
1import functools 1abcdefg
2import sys 1abcdefg
3from collections.abc import Awaitable, Callable, Coroutine 1abcdefg
4from importlib import import_module 1abcdefg
5from typing import ( 1abcdefg
6 Any,
7 Generic,
8 ParamSpec,
9 TypeVar,
10)
11from warnings import warn 1abcdefg
13import anyio 1abcdefg
14import anyio.from_thread 1abcdefg
15import anyio.to_thread 1abcdefg
16import sniffio 1abcdefg
17from anyio._core._eventloop import threadlocals 1abcdefg
18from anyio.abc import TaskGroup as _TaskGroup 1abcdefg
19from asyncer._compat import run_sync 1abcdefg
22# This was obtained with: from anyio._core._eventloop import get_asynclib
23# Removed in https://github.com/agronholm/anyio/pull/429
24# Released in AnyIO 4.x.x
25# The new function is anyio._core._eventloop.get_async_backend but that returns a
26# class, not a module to extract the TaskGroup class from.
27def get_asynclib(asynclib_name: str | None = None) -> Any: 1abcdefg
28 if asynclib_name is None: 1CovhDpwiEqxjFrykGszlHtAmIuBn
29 asynclib_name = sniffio.current_async_library() 1CovhDpwiEqxjFrykGszlHtAmIuBn
31 modulename = "anyio._backends._" + asynclib_name 1CovhDpwiEqxjFrykGszlHtAmIuBn
32 try: 1CovhDpwiEqxjFrykGszlHtAmIuBn
33 return sys.modules[modulename] 1CovhDpwiEqxjFrykGszlHtAmIuBn
34 except KeyError: # pragma: no cover
35 return import_module(modulename)
38T_Retval = TypeVar("T_Retval") 1abcdefg
39T_ParamSpec = ParamSpec("T_ParamSpec") 1abcdefg
40T = TypeVar("T") 1abcdefg
43class PendingType: 1abcdefg
44 def __repr__(self) -> str: 1abcdefg
45 return "AsyncerPending" 2` { | } ~ abbb
48Pending = PendingType() 1abcdefg
51class PendingValueException(Exception): 1abcdefg
52 pass 1abcdefg
55class SoonValue(Generic[T]): 1abcdefg
56 def __init__(self) -> None: 1abcdefg
57 self._stored_value: T | PendingType = Pending 1CovhDpwiEqxjFrykGszlHtAmIuBn
59 @property 1abcdefg
60 def value(self) -> T: 1abcdefg
61 if isinstance(self._stored_value, PendingType): 1ovhpwiqxjrykszltAmuBn
62 raise PendingValueException( 1vwxyzAB
63 "The return value of this task is still pending. Maybe you forgot to "
64 "access it after the async with asyncer.create_task_group() block. "
65 "If you need to access values of async tasks inside the same task "
66 "group, you probably need a different approach, for example with "
67 "AnyIO Streams."
68 )
69 return self._stored_value 1ohpiqjrksltmun
71 @property 1abcdefg
72 def ready(self) -> bool: 1abcdefg
73 return not isinstance(self._stored_value, PendingType) 1hijklmn
76class TaskGroup(_TaskGroup): 1abcdefg
77 def soonify( 1abcdefg
78 self, async_function: Callable[T_ParamSpec, Awaitable[T]], name: object = None
79 ) -> Callable[T_ParamSpec, SoonValue[T]]:
80 """
81 Create and return a function that when called will start a new task in this
82 task group.
84 Internally it uses the same `task_group.start_soon()` method. But
85 `task_group.soonify()` supports keyword arguments additional to positional
86 arguments and it adds better support for autocompletion and inline errors
87 for the arguments of the function called.
89 Use it like this:
91 ```Python
92 async with asyncer.create_task_group() as task_group:
93 async def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
94 # Do work
95 return "Some result value"
97 result = task_group.soonify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
99 print(result.value)
100 ```
102 The return value from that function (`result` in the example) is an object
103 `SoonValue`.
105 This `SoonValue` object has an attribute `soon_value.value` that will hold the
106 return value of the original `async_function` *after* the `async with` block.
108 If you try to access the `soon_value.value` inside the `async with` block,
109 before it has the actual return value, it will raise a an exception
110 `asyncer.PendingValueException`.
112 If you think you need to access the return values inside the `async with` block,
113 there's a high chance that you really need a different approach, for example
114 using an AnyIO Stream.
116 But either way, if you have checkpoints inside the `async with` block (you have
117 some `await` there), one or more of the `SoonValue` objects you might have
118 could end up having the result value ready before ending the `async with` block.
119 You can check that with `soon_value.ready`. For example:
121 ```Python
122 async def do_work(name: str) -> str:
123 return f"Hello {name}"
125 async with asyncer.create_task_group() as task_group:
126 result1 = task_group.soonify(do_work)(name="task 1")
127 result2 = task_group.soonify(do_work)(name="task 2")
128 await anyio.sleep(0)
129 if result1.ready:
130 print(result1.value)
131 if result2.ready:
132 print(result2.value)
133 ```
136 ## Arguments
138 `async_function`: an async function to call soon
139 `name`: name of the task, for the purposes of introspection and debugging
141 ## Return
143 A function that takes positional and keyword arguments and when called
144 uses `task_group.start_soon()` to start the task in this task group.
146 That function returns a `SoonValue` object holding the return value of the
147 original function in `soon_value.value`.
148 """
150 @functools.wraps(async_function) 1CovhDpwiEqxjFrykGszlHtAmIuBn
151 def wrapper( 1CovhDpwiEqxjFrykGszlHtAmIuBn
152 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
153 ) -> SoonValue[T]:
154 partial_f = functools.partial(async_function, *args, **kwargs) 1CovhDpwiEqxjFrykGszlHtAmIuBn
155 soon_value: SoonValue[T] = SoonValue() 1CovhDpwiEqxjFrykGszlHtAmIuBn
157 @functools.wraps(partial_f) 1CovhDpwiEqxjFrykGszlHtAmIuBn
158 async def value_wrapper(*args: Any) -> None: 1CovhDpwiEqxjFrykGszlHtAmIuBn
159 value = await partial_f() 1CovhDpwiEqxjFrykGszlHtAmIuBn
160 soon_value._stored_value = value 1CohDpiEqjFrkGslHtmIun
162 self.start_soon(value_wrapper, name=name) 1CovhDpwiEqxjFrykGszlHtAmIuBn
163 return soon_value 1CovhDpwiEqxjFrykGszlHtAmIuBn
165 return wrapper 1CovhDpwiEqxjFrykGszlHtAmIuBn
167 # This is only for the return type annotation, but it won't really be called
168 async def __aenter__(self) -> "TaskGroup": # pragma: nocover 1abcdefg
169 """Enter the task group context and allow starting new tasks."""
170 return await super().__aenter__() # type: ignore
173def create_task_group() -> "TaskGroup": 1abcdefg
174 """
175 Create a task group used to start multiple concurrent tasks with async functions.
177 `asyncer.create_task_group()` is different from `anyio.create_task_group()` in that
178 it creates an extended `TaskGroup` object that includes the `task_group.soonify()`
179 method.
180 """
182 LibTaskGroup = get_asynclib().TaskGroup 1CovhDpwiEqxjFrykGszlHtAmIuBn
184 class ExtendedTaskGroup(LibTaskGroup, TaskGroup): # type: ignore[valid-type, misc] 1CovhDpwiEqxjFrykGszlHtAmIuBn
185 pass 1CovhDpwiEqxjFrykGszlHtAmIuBn
187 return ExtendedTaskGroup() 1CovhDpwiEqxjFrykGszlHtAmIuBn
190def runnify( 1abcdefg
191 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]],
192 backend: str = "asyncio",
193 backend_options: dict[str, Any] | None = None,
194) -> Callable[T_ParamSpec, T_Retval]:
195 """
196 Take an async function and create a regular (blocking) function that receives the
197 same keyword and positional arguments for the original async function, and that when
198 called will create an event loop and use it to run the original `async_function`
199 with those arguments.
201 That function returns the return value from the original `async_function`.
203 The current thread must not be already running an event loop.
205 This calls `anyio.run()` underneath.
207 Use it like this:
209 ```Python
210 async def program(name: str) -> str:
211 return f"Hello {name}"
214 result = asyncer.runnify(program)(name="World")
215 print(result)
216 ```
218 ## Arguments
220 `async_function`: an async function to call
221 `backend`: name of the asynchronous event loop implementation - currently either
222 `asyncio` or `trio`
223 `backend_options` keyword arguments to call the backend `run()` implementation with
225 ## Return
227 The return value of the async function
229 ## Raises
231 `RuntimeError`: if an asynchronous event loop is already running in this thread
232 `LookupError`: if the named backend is not found
233 """
235 @functools.wraps(async_function) 1=?@[]^_
236 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1=?@[]^_
237 partial_f = functools.partial(async_function, *args, **kwargs) 1=?@[]^_
239 return anyio.run(partial_f, backend=backend, backend_options=backend_options) 1=?@[]^_
241 return wrapper 1=?@[]^_
244def syncify( 1abcdefg
245 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]],
246 raise_sync_error: bool = True,
247) -> Callable[T_ParamSpec, T_Retval]:
248 """
249 Take an async function and create a regular one that receives the same keyword and
250 positional arguments, and that when called, calls the original async function in
251 the main async loop from the worker thread using `anyio.to_thread.run()`.
253 By default this is expected to be used from a worker thread. For example inside
254 some function passed to `asyncify()`.
256 But if you set `raise_sync_error` to `False`, you can also use this function
257 in a non-async context: without an async event loop. For example, from a
258 blocking/regular function called at the top level of a Python file. In that case,
259 if it is not being called from inside a worker thread started from an async context
260 (e.g. this is not called from a function that was called with `asyncify()`) it will
261 run `async_function` in a new async event loop with `anyio.run()`.
263 This functionality with `raise_sync_error` is there only to allow using
264 `syncify()` in codebases that are used by async code in some cases and by blocking
265 code in others. For example, during migrations from blocking code to async code.
267 Internally, `asyncer.syncify()` uses the same `anyio.from_thread.run()`, but it
268 supports keyword arguments additional to positional arguments and it adds better
269 support for tooling (e.g. editor autocompletion and inline errors) for the
270 arguments and return value of the function.
272 Use it like this:
274 ```Python
275 async def do_work(arg1, arg2, kwarg1="", kwarg2=""):
276 # Do work
278 result = from_thread.syncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
279 ```
281 ## Arguments
283 `async_function`: an async function to be called in the main thread, in the async
284 event loop
285 `raise_sync_error`: If set to `False`, when used in a non-async context (without
286 an async event loop), it will run `async_function` in a new async event loop,
287 instead of raising an exception.
289 ## Return
291 A regular blocking function that takes the same positional and keyword arguments
292 as the original async one, that when called runs the same original function in
293 the main async loop when called from a worker thread and returns the result.
294 """
296 @functools.wraps(async_function) 1JXKLYMNZOP0QR1ST2UV3W
297 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1JXKLYMNZOP0QR1ST2UV3W
298 current_async_module = ( 1JXKLYMNZOP0QR1ST2UV3W
299 getattr(threadlocals, "current_token", None)
300 or
301 # TODO: remove when deprecating AnyIO 4.10.0
302 getattr(threadlocals, "current_async_backend", None)
303 or
304 # TODO: remove when deprecating AnyIO 3.x
305 getattr(threadlocals, "current_async_module", None)
306 )
307 partial_f = functools.partial(async_function, *args, **kwargs) 1JXKLYMNZOP0QR1ST2UV3W
308 if current_async_module is None and raise_sync_error is False: 1JXKLYMNZOP0QR1ST2UV3W
309 return anyio.run(partial_f) 1JKLMNOPQRSTUVW
310 return anyio.from_thread.run(partial_f) 1JXKLYMNZOP0QR1ST2UV3W
312 return wrapper 1JXKLYMNZOP0QR1ST2UV3W
315def asyncify( 1abcdefg
316 function: Callable[T_ParamSpec, T_Retval],
317 *,
318 abandon_on_cancel: bool = False,
319 cancellable: bool | None = None,
320 limiter: anyio.CapacityLimiter | None = None,
321) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
322 """
323 Take a blocking function and create an async one that receives the same
324 positional and keyword arguments, and that when called, calls the original function
325 in a worker thread using `anyio.to_thread.run_sync()`. Internally,
326 `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
327 keyword arguments additional to positional arguments and it adds better support for
328 autocompletion and inline errors for the arguments of the function called and the
329 return value.
331 If the `cancellable` option is enabled and the task waiting for its completion is
332 cancelled, the thread will still run its course but its return value (or any raised
333 exception) will be ignored.
335 Use it like this:
337 ```Python
338 def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
339 # Do work
340 return "Some result"
342 result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
343 print(result)
344 ```
346 ## Arguments
348 `function`: a blocking regular callable (e.g. a function)
349 `cancellable`: `True` to allow cancellation of the operation
350 `limiter`: capacity limiter to use to limit the total amount of threads running
351 (if omitted, the default limiter is used)
353 ## Return
355 An async function that takes the same positional and keyword arguments as the
356 original one, that when called runs the same original function in a thread worker
357 and returns the result.
358 """
359 if cancellable is not None: 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W
360 abandon_on_cancel = cancellable 1456789!
361 warn( 1456789!
362 "The `cancellable=` keyword argument to `asyncer.asyncify()` is "
363 "deprecated since Asyncer 0.0.8, following AnyIO 4.1.0. "
364 "Use `abandon_on_cancel=` instead.",
365 DeprecationWarning,
366 stacklevel=2,
367 )
369 @functools.wraps(function) 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W
370 async def wrapper( 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W
371 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
372 ) -> T_Retval:
373 partial_f = functools.partial(function, *args, **kwargs) 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W
375 return await run_sync( 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W
376 partial_f, abandon_on_cancel=abandon_on_cancel, limiter=limiter
377 )
379 return wrapper 1#4J$XK%5L'YM(6N)ZO*7P+0Q,8R-1S.9T/2U:!V;3W