Coverage for faststream / _internal / endpoint / subscriber / supervisor.py: 97%
43 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from __future__ import annotations
3import logging
4import os
5import time
6from asyncio import CancelledError, Task
7from collections import UserDict
8from typing import TYPE_CHECKING, Any
10if TYPE_CHECKING:
11 from collections.abc import Callable, Coroutine
13 from faststream._internal.endpoint.subscriber.mixins import TasksMixin
16class _SupervisorCache(UserDict[int, float]):
17 @property
18 def ttl(self) -> float:
19 return float(os.getenv("FASTSTREAM_SUPERVISOR_CACHE_TTL", "3600"))
21 def _cleanup(self) -> None:
22 # no need to clean up empty storage
23 if not len(self):
24 return
26 # dict preserves insertion order hence if the first element is not outdated there is no reason to check others
27 _, timestamp = next(iter(self.items()))
28 now = time.time()
29 if now - timestamp < self.ttl:
30 return
32 now, to_delete = time.time(), []
33 for k, v in self.items():
34 if now - v >= self.ttl: 34 ↛ 37line 34 didn't jump to line 37 because the condition on line 34 was always true
35 to_delete.append(k)
36 else:
37 break
39 for k in to_delete:
40 del self[k]
42 def add(self, item: int) -> None:
43 self[item] = time.time()
45 def __contains__(self, key: object) -> bool:
46 self._cleanup()
47 return super().__contains__(key)
50class TaskCallbackSupervisor:
51 """Supervisor for asyncio.Task spawned in TaskMixin implemented via task callback."""
53 __slots__ = (
54 "args",
55 "func",
56 "ignored_exceptions",
57 "kwargs",
58 "max_attempts",
59 "subscriber",
60 )
62 # stores hash identifier of exceptions which whose info was printed
63 __cache: _SupervisorCache = _SupervisorCache()
65 def __init__(
66 self,
67 func: Callable[..., Coroutine[Any, Any, Any]],
68 func_args: tuple[Any] | None,
69 func_kwargs: dict[str, Any] | None,
70 subscriber: TasksMixin,
71 *,
72 ignored_exceptions: tuple[type[BaseException], ...] = (CancelledError,),
73 ) -> None:
74 self.subscriber = subscriber
75 self.func = func
76 self.args = func_args or ()
77 self.kwargs = func_kwargs or {}
78 self.ignored_exceptions = ignored_exceptions
80 @staticmethod
81 def _get_exception_identifier(
82 exception: BaseException, *, message_size: int = 4096
83 ) -> int:
84 # NOTE: method accepts only raised exceptions (e.g. with __traceback__)
85 line, message, klass = (
86 getattr(exception.__traceback__, "tb_lineno", 0),
87 str(exception)[:message_size],
88 str(exception.__class__),
89 )
90 to_hash = f"{line}-{message}-{klass}"
92 return hash(to_hash)
94 @property
95 def is_disabled(self) -> bool:
96 # supervisor can affect some test cases, so it might be useful to have global killswitch.
97 return bool(int(os.getenv("FASTSTREAM_SUPERVISOR_DISABLED", "0")))
99 def __call__(self, task: Task[Any]) -> None:
100 logger = self.subscriber._outer_config.logger
102 logger.log(
103 f"callback for {task.get_name()} is being executed...",
104 log_level=logging.INFO,
105 )
107 if task.cancelled() or self.is_disabled:
108 return
110 if (exc := task.exception()) and not isinstance(exc, self.ignored_exceptions):
111 # trace is printed only once, but task is still retried
112 identifier = self._get_exception_identifier(exc)
113 if identifier not in self.__cache:
114 self.__cache.add(identifier)
115 logger.log(
116 f"{task.get_name()} raised an exception, retrying...\n"
117 "If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. "
118 "Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.",
119 exc_info=exc,
120 log_level=logging.ERROR,
121 )
123 self.subscriber.add_task(self.func, self.args, self.kwargs)