Coverage for asyncer/_main.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-09 01:12 +0000

1import functools 1abcde

2import sys 1abcde

3from importlib import import_module 1abcde

4from typing import ( 1abcde

5 Any, 

6 Awaitable, 

7 Callable, 

8 Coroutine, 

9 Dict, 

10 Generic, 

11 Optional, 

12 TypeVar, 

13 Union, 

14) 

15from warnings import warn 1abcde

16 

17from asyncer._compat import run_sync 1abcde

18 

19if sys.version_info >= (3, 10): 1abcde

20 from typing import ParamSpec 1cde

21else: 

22 from typing_extensions import ParamSpec 1ab

23 

24import anyio 1abcde

25import anyio.from_thread 1abcde

26import anyio.to_thread 1abcde

27import sniffio 1abcde

28from anyio._core._eventloop import threadlocals 1abcde

29from anyio.abc import TaskGroup as _TaskGroup 1abcde

30 

31 

32# This was obtained with: from anyio._core._eventloop import get_asynclib 

33# Removed in https://github.com/agronholm/anyio/pull/429 

34# Released in AnyIO 4.x.x 

35# The new function is anyio._core._eventloop.get_async_backend but that returns a 

36# class, not a module to extract the TaskGroup class from. 

37def get_asynclib(asynclib_name: Union[str, None] = None) -> Any: 1abcde

38 if asynclib_name is None: 1ukpfvlqgwmrhxnsiyotj

39 asynclib_name = sniffio.current_async_library() 1ukpfvlqgwmrhxnsiyotj

40 

41 modulename = "anyio._backends._" + asynclib_name 1ukpfvlqgwmrhxnsiyotj

42 try: 1ukpfvlqgwmrhxnsiyotj

43 return sys.modules[modulename] 1ukpfvlqgwmrhxnsiyotj

44 except KeyError: # pragma: no cover 

45 return import_module(modulename) 

46 

47 

48T_Retval = TypeVar("T_Retval") 1abcde

49T_ParamSpec = ParamSpec("T_ParamSpec") 1abcde

50T = TypeVar("T") 1abcde

51 

52 

53class PendingType: 1abcde

54 def __repr__(self) -> str: 1abcde

55 return "AsyncerPending" 189!#$

56 

57 

58Pending = PendingType() 1abcde

59 

60 

61class PendingValueException(Exception): 1abcde

62 pass 1abcde

63 

64 

65class SoonValue(Generic[T]): 1abcde

66 def __init__(self) -> None: 1abcde

67 self._stored_value: Union[T, PendingType] = Pending 1ukpfvlqgwmrhxnsiyotj

68 

69 @property 1abcde

70 def value(self) -> T: 1abcde

71 if isinstance(self._stored_value, PendingType): 1kpflqgmrhnsiotj

72 raise PendingValueException( 1pqrst

73 "The return value of this task is still pending. Maybe you forgot to " 

74 "access it after the async with asyncer.create_task_group() block. " 

75 "If you need to access values of async tasks inside the same task " 

76 "group, you probably need a different approach, for example with " 

77 "AnyIO Streams." 

78 ) 

79 return self._stored_value 1kflgmhnioj

80 

81 @property 1abcde

82 def ready(self) -> bool: 1abcde

83 return not isinstance(self._stored_value, PendingType) 1fghij

84 

85 

86class TaskGroup(_TaskGroup): 1abcde

87 def soonify( 1abcde

88 self, async_function: Callable[T_ParamSpec, Awaitable[T]], name: object = None 

89 ) -> Callable[T_ParamSpec, SoonValue[T]]: 

90 """ 

91 Create and return a function that when called will start a new task in this 

92 task group. 

93 

94 Internally it uses the same `task_group.start_soon()` method. But 

95 `task_group.soonify()` supports keyword arguments additional to positional 

96 arguments and it adds better support for autocompletion and inline errors 

97 for the arguments of the function called. 

98 

99 Use it like this: 

100 

101 ```Python 

102 async with asyncer.create_task_group() as task_group: 

103 async def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: 

104 # Do work 

105 return "Some result value" 

106 

107 result = task_group.soonify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

108 

109 print(result.value) 

110 ``` 

111 

112 The return value from that function (`result` in the example) is an object 

113 `SoonValue`. 

114 

115 This `SoonValue` object has an attribute `soon_value.value` that will hold the 

116 return value of the original `async_function` *after* the `async with` block. 

117 

118 If you try to access the `soon_value.value` inside the `async with` block, 

119 before it has the actual return value, it will raise a an exception 

120 `asyncer.PendingValueException`. 

121 

122 If you think you need to access the return values inside the `async with` block, 

123 there's a high chance that you really need a different approach, for example 

124 using an AnyIO Stream. 

125 

126 But either way, if you have checkpoints inside the `async with` block (you have 

127 some `await` there), one or more of the `SoonValue` objects you might have 

128 could end up having the result value ready before ending the `async with` block. 

129 You can check that with `soon_value.pending`. For example: 

130 

131 ```Python 

132 async def do_work(name: str) -> str: 

133 return f"Hello {name}" 

134 

135 async with asyncer.create_task_group() as task_group: 

136 result1 = task_group.soonify(do_work)(name="task 1") 

137 result2 = task_group.soonify(do_work)(name="task 2") 

138 await anyio.sleep(0) 

139 if not result1.pending: 

140 print(result1.value) 

141 if not result2.pending: 

142 print(result2.value) 

143 ``` 

144 

145 

146 ## Arguments 

147 

148 `async_function`: an async function to call soon 

149 `name`: name of the task, for the purposes of introspection and debugging 

150 

151 ## Return 

152 

153 A function that takes positional and keyword arguments and when called 

154 uses `task_group.start_soon()` to start the task in this task group. 

155 

156 That function returns a `SoonValue` object holding the return value of the 

157 original function in `soon_value.value`. 

158 """ 

159 

160 @functools.wraps(async_function) 1ukpfvlqgwmrhxnsiyotj

161 def wrapper( 1ukpfvlqgwmrhxnsiyotj

162 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs 

163 ) -> SoonValue[T]: 

164 partial_f = functools.partial(async_function, *args, **kwargs) 1ukpfvlqgwmrhxnsiyotj

165 soon_value: SoonValue[T] = SoonValue() 1ukpfvlqgwmrhxnsiyotj

166 

167 @functools.wraps(partial_f) 1ukpfvlqgwmrhxnsiyotj

168 async def value_wrapper(*args: Any) -> None: 1ukpfvlqgwmrhxnsiyotj

169 value = await partial_f() 1ukpfvlqgwmrhxnsiyotj

170 soon_value._stored_value = value 1ukfvlgwmhxniyoj

171 

172 self.start_soon(value_wrapper, name=name) 1ukpfvlqgwmrhxnsiyotj

173 return soon_value 1ukpfvlqgwmrhxnsiyotj

174 

175 return wrapper 1ukpfvlqgwmrhxnsiyotj

176 

177 # This is only for the return type annotation, but it won't really be called 

178 async def __aenter__(self) -> "TaskGroup": # pragma: nocover 1abcde

179 """Enter the task group context and allow starting new tasks.""" 

180 return await super().__aenter__() # type: ignore 

181 

182 

183def create_task_group() -> "TaskGroup": 1abcde

184 """ 

185 Create a task group used to start multiple concurrent tasks with async functions. 

186 

187 `asyncer.create_task_group()` is different from `anyio.create_task_group()` in that 

188 it creates an extended `TaskGroup` object that includes the `task_group.soonify()` 

189 method. 

190 """ 

191 

192 LibTaskGroup = get_asynclib().TaskGroup 1ukpfvlqgwmrhxnsiyotj

193 

194 class ExtendedTaskGroup(LibTaskGroup, TaskGroup): # type: ignore 1ukpfvlqgwmrhxnsiyotj

195 pass 1ukpfvlqgwmrhxnsiyotj

196 

197 return ExtendedTaskGroup() 1ukpfvlqgwmrhxnsiyotj

198 

199 

200def runnify( 1abcde

201 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]], 

202 backend: str = "asyncio", 

203 backend_options: Optional[Dict[str, Any]] = None, 

204) -> Callable[T_ParamSpec, T_Retval]: 

205 """ 

206 Take an async function and create a regular (blocking) function that receives the 

207 same keyword and positional arguments for the original async function, and that when 

208 called will create an event loop and use it to run the original `async_function` 

209 with those arguments. 

210 

211 That function returns the return value from the original `async_function`. 

212 

213 The current thread must not be already running an event loop. 

214 

215 This calls `anyio.run()` underneath. 

216 

217 Use it like this: 

218 

219 ```Python 

220 async def program(name: str) -> str: 

221 return f"Hello {name}" 

222 

223 

224 result = asyncer.runnify(program)(name="World") 

225 print(result) 

226 ``` 

227 

228 ## Arguments 

229 

230 `async_function`: an async function to call 

231 `backend`: name of the asynchronous event loop implementation - currently either 

232 `asyncio` or `trio` 

233 `backend_options` keyword arguments to call the backend `run()` implementation with 

234 

235 ## Return 

236 

237 The return value of the async function 

238 

239 ## Raises 

240 

241 `RuntimeError`: if an asynchronous event loop is already running in this thread 

242 `LookupError`: if the named backend is not found 

243 """ 

244 

245 @functools.wraps(async_function) 134567

246 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 134567

247 partial_f = functools.partial(async_function, *args, **kwargs) 134567

248 

249 return anyio.run(partial_f, backend=backend, backend_options=backend_options) 134567

250 

251 return wrapper 134567

252 

253 

254def syncify( 1abcde

255 async_function: Callable[T_ParamSpec, Coroutine[Any, Any, T_Retval]], 

256 raise_sync_error: bool = True, 

257) -> Callable[T_ParamSpec, T_Retval]: 

258 """ 

259 Take an async function and create a regular one that receives the same keyword and 

260 positional arguments, and that when called, calls the original async function in 

261 the main async loop from the worker thread using `anyio.to_thread.run()`. 

262 

263 By default this is expected to be used from a worker thread. For example inside 

264 some function passed to `asyncify()`. 

265 

266 But if you set `raise_sync_error` to `False`, you can also use this function 

267 in a non-async context: without an async event loop. For example, from a 

268 blocking/regular function called at the top level of a Python file. In that case, 

269 if it is not being called from inside a worker thread started from an async context 

270 (e.g. this is not called from a function that was called with `asyncify()`) it will 

271 run `async_function` in a new async event loop with `anyio.run()`. 

272 

273 This functionality with `raise_sync_error` is there only to allow using 

274 `syncify()` in codebases that are used by async code in some cases and by blocking 

275 code in others. For example, during migrations from blocking code to async code. 

276 

277 Internally, `asyncer.syncify()` uses the same `anyio.from_thread.run()`, but it 

278 supports keyword arguments additional to positional arguments and it adds better 

279 support for tooling (e.g. editor autocompletion and inline errors) for the 

280 arguments and return value of the function. 

281 

282 Use it like this: 

283 

284 ```Python 

285 async def do_work(arg1, arg2, kwarg1="", kwarg2=""): 

286 # Do work 

287 

288 result = from_thread.syncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

289 ``` 

290 

291 ## Arguments 

292 

293 `async_function`: an async function to be called in the main thread, in the async 

294 event loop 

295 `raise_sync_error`: If set to `False`, when used in a non-async context (without 

296 an async event loop), it will run `async_function` in a new async event loop, 

297 instead of raising an exception. 

298 

299 ## Return 

300 

301 A regular blocking function that takes the same positional and keyword arguments 

302 as the original async one, that when called runs the same original function in 

303 the main async loop when called from a worker thread and returns the result. 

304 """ 

305 

306 @functools.wraps(async_function) 1zJABKCDLEFMGHNI

307 def wrapper(*args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs) -> T_Retval: 1zJABKCDLEFMGHNI

308 current_async_module = ( 1zJABKCDLEFMGHNI

309 getattr(threadlocals, "current_async_backend", None) 

310 or 

311 # TODO: remove when deprecating AnyIO 3.x 

312 getattr(threadlocals, "current_async_module", None) 

313 ) 

314 partial_f = functools.partial(async_function, *args, **kwargs) 1zJABKCDLEFMGHNI

315 if current_async_module is None and raise_sync_error is False: 1zJABKCDLEFMGHNI

316 return anyio.run(partial_f) 1zABCDEFGHI

317 return anyio.from_thread.run(partial_f) 1zJABKCDLEFMGHNI

318 

319 return wrapper 1zJABKCDLEFMGHNI

320 

321 

322def asyncify( 1abcde

323 function: Callable[T_ParamSpec, T_Retval], 

324 *, 

325 abandon_on_cancel: bool = False, 

326 cancellable: Union[bool, None] = None, 

327 limiter: Optional[anyio.CapacityLimiter] = None, 

328) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: 

329 """ 

330 Take a blocking function and create an async one that receives the same 

331 positional and keyword arguments, and that when called, calls the original function 

332 in a worker thread using `anyio.to_thread.run_sync()`. Internally, 

333 `asyncer.asyncify()` uses the same `anyio.to_thread.run_sync()`, but it supports 

334 keyword arguments additional to positional arguments and it adds better support for 

335 autocompletion and inline errors for the arguments of the function called and the 

336 return value. 

337 

338 If the `cancellable` option is enabled and the task waiting for its completion is 

339 cancelled, the thread will still run its course but its return value (or any raised 

340 exception) will be ignored. 

341 

342 Use it like this: 

343 

344 ```Python 

345 def do_work(arg1, arg2, kwarg1="", kwarg2="") -> str: 

346 # Do work 

347 return "Some result" 

348 

349 result = await to_thread.asyncify(do_work)("spam", "ham", kwarg1="a", kwarg2="b") 

350 print(result) 

351 ``` 

352 

353 ## Arguments 

354 

355 `function`: a blocking regular callable (e.g. a function) 

356 `cancellable`: `True` to allow cancellation of the operation 

357 `limiter`: capacity limiter to use to limit the total amount of threads running 

358 (if omitted, the default limiter is used) 

359 

360 ## Return 

361 

362 An async function that takes the same positional and keyword arguments as the 

363 original one, that when called runs the same original function in a thread worker 

364 and returns the result. 

365 """ 

366 if cancellable is not None: 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI

367 abandon_on_cancel = cancellable 1OPQRS

368 warn( 1OPQRS

369 "The `cancellable=` keyword argument to `asyncer.asyncify()` is " 

370 "deprecated since Asyncer 0.0.8, following AnyIO 4.1.0. " 

371 "Use `abandon_on_cancel=` instead.", 

372 DeprecationWarning, 

373 stacklevel=2, 

374 ) 

375 

376 @functools.wraps(function) 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI

377 async def wrapper( 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI

378 *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs 

379 ) -> T_Retval: 

380 partial_f = functools.partial(function, *args, **kwargs) 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI

381 

382 return await run_sync( 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI

383 partial_f, abandon_on_cancel=abandon_on_cancel, limiter=limiter 

384 ) 

385 

386 return wrapper 1TOzUJAVPBWKCXQDYLEZRF0MG1SH2NI