Coverage for asyncer/_main.py: 100%
83 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 01:12 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-09 01:12 +0000
1import functools 1abcde
2import sys 1abcde
3from importlib import import_module 1abcde
4from typing import ( 1abcde
5 Any,
6 Awaitable,
7 Callable,
8 Coroutine,
9 Dict,
10 Generic,
11 Optional,
12 TypeVar,
13 Union,
14)
15from warnings import warn 1abcde
17from asyncer._compat import run_sync 1abcde
19if sys.version_info >= (3, 10): 1abcde
20 from typing import ParamSpec 1cde
21else:
22 from typing_extensions import ParamSpec 1ab
24import anyio 1abcde
25import anyio.from_thread 1abcde
26import anyio.to_thread 1abcde
27import sniffio 1abcde
28from anyio._core._eventloop import threadlocals 1abcde
29from anyio.abc import TaskGroup as _TaskGroup 1abcde
32# This was obtained with: from anyio._core._eventloop import get_asynclib
33# Removed in https://github.com/agronholm/anyio/pull/429
34# Released in AnyIO 4.x.x
35# The new function is anyio._core._eventloop.get_async_backend but that returns a
36# class, not a module to extract the TaskGroup class from.
37def get_asynclib(asynclib_name: Union[str, None] = None) -> Any: 1abcde
38 if asynclib_name is None: 1ukpfvlqgwmrhxnsiyotj
39 asynclib_name = sniffio.current_async_library() 1ukpfvlqgwmrhxnsiyotj
41 modulename = "anyio._backends._" + asynclib_name 1ukpfvlqgwmrhxnsiyotj
42 try: 1ukpfvlqgwmrhxnsiyotj
43 return sys.modules[modulename] 1ukpfvlqgwmrhxnsiyotj
44 except KeyError: # pragma: no cover
45 return import_module(modulename)
48T_Retval = TypeVar("T_Retval") 1abcde
49T_ParamSpec = ParamSpec("T_ParamSpec") 1abcde
50T = TypeVar("T") 1abcde
53class PendingType: 1abcde
54 def __repr__(self) -> str: 1abcde
55 return "AsyncerPending" 189!#$
58Pending = PendingType() 1abcde
61class PendingValueException(Exception): 1abcde
62 pass 1abcde
65class SoonValue(Generic[T]): 1abcde
66 def __init__(self) -> None: 1abcde
67 self._stored_value: Union[T, PendingType] = Pending 1ukpfvlqgwmrhxnsiyotj
69 @property 1abcde
70 def value(self) -> T: 1abcde
71 if isinstance(self._stored_value, PendingType): 1kpflqgmrhnsiotj
72 raise PendingValueException( 1pqrst
73 "The return value of this task is still pending. Maybe you forgot to "
74 "access it after the async with asyncer.create_task_group() block. "
75 "If you need to access values of async tasks inside the same task "
76 "group, you probably need a different approach, for example with "
77 "AnyIO Streams."
78 )
79 return self._stored_value 1kflgmhnioj
81 @property 1abcde
82 def ready(self) -> bool: 1abcde
83 return not isinstance(self._stored_value, PendingType) 1fghij
86class TaskGroup(_TaskGroup): 1abcde
87 def soonify( 1abcde
88 self, async_function: Callable[T_ParamSpec, Awaitable[T]], name: object = None
89 ) -> Callable[T_ParamSpec, SoonValue[T]]:
90 """
91 Create and return a function that when called will start a new task in this
92 task group.
94 Internally it uses the same `task_group.start_soon()` method. But
95 `task_group.soonify()` supports keyword arguments additional to positional
96 arguments and it adds better support for autocompletion and inline errors
97 for the arguments of the function called.
99 Use it like this:
101 ```Python
102 async with asyncer.create_task_group() as task_group:
103 async def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
104 # Do work
105 return "Some result value"
107 result = task_group.soonify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
109 print(result.value)
110 ```
112 The return value from that function (`result` in the example) is an object
113 `SoonValue`.
115 This `SoonValue` object has an attribute `soon_value.value` that will hold the
116 return value of the original `async_function` *after* the `async with` block.
118 If you try to access the `soon_value.value` inside the `async with` block,
119 before it has the actual return value, it will raise a an exception
120 `asyncer.PendingValueException`.
122 If you think you need to access the return values inside the `async with` block,
123 there's a high chance that you really need a different approach, for example
124 using an AnyIO Stream.
126 But either way, if you have checkpoints inside the `async with` block (you have
127 some `await` there), one or more of the `SoonValue` objects you might have
128 could end up having the result value ready before ending the `async with` block.
129 You can check that with `soon_value.pending`. For example:
131 ```Python
132 async def do_work(name: str) -> str:
133 return f"Hello {name}"
135 async with asyncer.create_task_group() as task_group:
136 result1 = task_group.soonify(do_work)(name="task 1")
137 result2 = task_group.soonify(do_work)(name="task 2")
138 await anyio.sleep(0)
139 if not result1.pending:
140 print(result1.value)
141 if not result2.pending:
142 print(result2.value)
143 ```
146 ## Arguments
148 `async_function`: an async function to call soon
149 `name`: name of the task, for the purposes of introspection and debugging
151 ## Return
153 A function that takes positional and keyword arguments and when called
154 uses `task_group.start_soon()` to start the task in this task group.
156 That function returns a `SoonValue` object holding the return value of the
157 original function in `soon_value.value`.
158 """
160 @functools.wraps(async_function) 1ukpfvlqgwmrhxnsiyotj
161 def wrapper( 1ukpfvlqgwmrhxnsiyotj
162 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
163 ) -> SoonValue[T]:
164 partial_f = functools.partial(async_function, *args, **kwargs) 1ukpfvlqgwmrhxnsiyotj
165 soon_value: SoonValue[T] = SoonValue() 1ukpfvlqgwmrhxnsiyotj
167 @functools.wraps(partial_f) 1ukpfvlqgwmrhxnsiyotj
168 async def value_wrapper(*args: Any) -> None: 1ukpfvlqgwmrhxnsiyotj
169 value = await partial_f() 1ukpfvlqgwmrhxnsiyotj
170 soon_value._stored_value = value 1ukfvlgwmhxniyoj
172 self.start_soon(value_wrapper, name=name) 1ukpfvlqgwmrhxnsiyotj
173 return soon_value 1ukpfvlqgwmrhxnsiyotj
175 return wrapper 1ukpfvlqgwmrhxnsiyotj
177 # This is only for the return type annotation, but it won't really be called
178 async def __aenter__(self) -> "TaskGroup": # pragma: nocover 1abcde
179 """Enter the task group context and allow starting new tasks."""
180 return await super().__aenter__() # type: ignore
183def create_task_group() -> "TaskGroup": 1abcde
184 """
185 Create a task group used to start multiple concurrent tasks with async functions.
187 `asyncer.create_task_group()` is different from `anyio.create_task_group()` in that
188 it creates an extended `TaskGroup` object that includes the `task_group.soonify()`
189 method.
190 """
192 LibTaskGroup = get_asynclib().TaskGroup 1ukpfvlqgwmrhxnsiyotj
194 class ExtendedTaskGroup(LibTaskGroup, TaskGroup): # type: ignore 1ukpfvlqgwmrhxnsiyotj
195 pass 1ukpfvlqgwmrhxnsiyotj
197 return ExtendedTaskGroup() 1ukpfvlqgwmrhxnsiyotj
200def runnify( 1abcde
201 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]],
202 backend: str = "asyncio",
203 backend_options: Optional[Dict[str, Any]] = None,
204) -> Callable[T_ParamSpec, T_Retval]:
205 """
206 Take an async function and create a regular (blocking) function that receives the
207 same keyword and positional arguments for the original async function, and that when
208 called will create an event loop and use it to run the original `async_function`
209 with those arguments.
211 That function returns the return value from the original `async_function`.
213 The current thread must not be already running an event loop.
215 This calls `anyio.run()` underneath.
217 Use it like this:
219 ```Python
220 async def program(name: str) -> str:
221 return f"Hello {name}"
224 result = asyncer.runnify(program)(name="World")
225 print(result)
226 ```
228 ## Arguments
230 `async_function`: an async function to call
231 `backend`: name of the asynchronous event loop implementation - currently either
232 `asyncio` or `trio`
233 `backend_options` keyword arguments to call the backend `run()` implementation with
235 ## Return
237 The return value of the async function
239 ## Raises
241 `RuntimeError`: if an asynchronous event loop is already running in this thread
242 `LookupError`: if the named backend is not found
243 """
245 @functools.wraps(async_function) 134567
246 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 134567
247 partial_f = functools.partial(async_function, *args, **kwargs) 134567
249 return anyio.run(partial_f, backend=backend, backend_options=backend_options) 134567
251 return wrapper 134567
254def syncify( 1abcde
255 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]],
256 raise_sync_error: bool = True,
257) -> Callable[T_ParamSpec, T_Retval]:
258 """
259 Take an async function and create a regular one that receives the same keyword and
260 positional arguments, and that when called, calls the original async function in
261 the main async loop from the worker thread using `anyio.to_thread.run()`.
263 By default this is expected to be used from a worker thread. For example inside
264 some function passed to `asyncify()`.
266 But if you set `raise_sync_error` to `False`, you can also use this function
267 in a non-async context: without an async event loop. For example, from a
268 blocking/regular function called at the top level of a Python file. In that case,
269 if it is not being called from inside a worker thread started from an async context
270 (e.g. this is not called from a function that was called with `asyncify()`) it will
271 run `async_function` in a new async event loop with `anyio.run()`.
273 This functionality with `raise_sync_error` is there only to allow using
274 `syncify()` in codebases that are used by async code in some cases and by blocking
275 code in others. For example, during migrations from blocking code to async code.
277 Internally, `asyncer.syncify()` uses the same `anyio.from_thread.run()`, but it
278 supports keyword arguments additional to positional arguments and it adds better
279 support for tooling (e.g. editor autocompletion and inline errors) for the
280 arguments and return value of the function.
282 Use it like this:
284 ```Python
285 async def do_work(arg1, arg2, kwarg1="", kwarg2=""):
286 # Do work
288 result = from_thread.syncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
289 ```
291 ## Arguments
293 `async_function`: an async function to be called in the main thread, in the async
294 event loop
295 `raise_sync_error`: If set to `False`, when used in a non-async context (without
296 an async event loop), it will run `async_function` in a new async event loop,
297 instead of raising an exception.
299 ## Return
301 A regular blocking function that takes the same positional and keyword arguments
302 as the original async one, that when called runs the same original function in
303 the main async loop when called from a worker thread and returns the result.
304 """
306 @functools.wraps(async_function) 1zJABKCDLEFMGHNI
307 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1zJABKCDLEFMGHNI
308 current_async_module = ( 1zJABKCDLEFMGHNI
309 getattr(threadlocals, "current_async_backend", None)
310 or
311 # TODO: remove when deprecating AnyIO 3.x
312 getattr(threadlocals, "current_async_module", None)
313 )
314 partial_f = functools.partial(async_function, *args, **kwargs) 1zJABKCDLEFMGHNI
315 if current_async_module is None and raise_sync_error is False: 1zJABKCDLEFMGHNI
316 return anyio.run(partial_f) 1zABCDEFGHI
317 return anyio.from_thread.run(partial_f) 1zJABKCDLEFMGHNI
319 return wrapper 1zJABKCDLEFMGHNI
322def asyncify( 1abcde
323 function: Callable[T_ParamSpec, T_Retval],
324 *,
325 abandon_on_cancel: bool = False,
326 cancellable: Union[bool, None] = None,
327 limiter: Optional[anyio.CapacityLimiter] = None,
328) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
329 """
330 Take a blocking function and create an async one that receives the same
331 positional and keyword arguments, and that when called, calls the original function
332 in a worker thread using `anyio.to_thread.run_sync()`. Internally,
333 `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports
334 keyword arguments additional to positional arguments and it adds better support for
335 autocompletion and inline errors for the arguments of the function called and the
336 return value.
338 If the `cancellable` option is enabled and the task waiting for its completion is
339 cancelled, the thread will still run its course but its return value (or any raised
340 exception) will be ignored.
342 Use it like this:
344 ```Python
345 def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str:
346 # Do work
347 return "Some result"
349 result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b")
350 print(result)
351 ```
353 ## Arguments
355 `function`: a blocking regular callable (e.g. a function)
356 `cancellable`: `True` to allow cancellation of the operation
357 `limiter`: capacity limiter to use to limit the total amount of threads running
358 (if omitted, the default limiter is used)
360 ## Return
362 An async function that takes the same positional and keyword arguments as the
363 original one, that when called runs the same original function in a thread worker
364 and returns the result.
365 """
366 if cancellable is not None: 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI
367 abandon_on_cancel = cancellable 1OPQRS
368 warn( 1OPQRS
369 "The `cancellable=` keyword argument to `asyncer.asyncify()` is "
370 "deprecated since Asyncer 0.0.8, following AnyIO 4.1.0. "
371 "Use `abandon_on_cancel=` instead.",
372 DeprecationWarning,
373 stacklevel=2,
374 )
376 @functools.wraps(function) 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI
377 async def wrapper( 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI
378 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
379 ) -> T_Retval:
380 partial_f = functools.partial(function, *args, **kwargs) 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI
382 return await run_sync( 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI
383 partial_f, abandon_on_cancel=abandon_on_cancel, limiter=limiter
384 )
386 return wrapper 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI