Coverage for faststream / _internal / endpoint / subscriber / supervisor.py: 97%

43 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from __future__ import annotations 

2 

3import logging 

4import os 

5import time 

6from asyncio import CancelledError, Task 

7from collections import UserDict 

8from typing import TYPE_CHECKING, Any 

9 

10if TYPE_CHECKING: 

11 from collections.abc import Callable, Coroutine 

12 

13 from faststream._internal.endpoint.subscriber.mixins import TasksMixin 

14 

15 

16class _SupervisorCache(UserDict[int, float]): 

17 @property 

18 def ttl(self) -> float: 

19 return float(os.getenv("FASTSTREAM_SUPERVISOR_CACHE_TTL", "3600")) 

20 

21 def _cleanup(self) -> None: 

22 # no need to clean up empty storage 

23 if not len(self): 

24 return 

25 

26 # dict preserves insertion order hence if the first element is not outdated there is no reason to check others 

27 _, timestamp = next(iter(self.items())) 

28 now = time.time() 

29 if now - timestamp < self.ttl: 

30 return 

31 

32 now, to_delete = time.time(), [] 

33 for k, v in self.items(): 

34 if now - v >= self.ttl: 34 ↛ 37line 34 didn't jump to line 37 because the condition on line 34 was always true

35 to_delete.append(k) 

36 else: 

37 break 

38 

39 for k in to_delete: 

40 del self[k] 

41 

42 def add(self, item: int) -> None: 

43 self[item] = time.time() 

44 

45 def __contains__(self, key: object) -> bool: 

46 self._cleanup() 

47 return super().__contains__(key) 

48 

49 

50class TaskCallbackSupervisor: 

51 """Supervisor for asyncio.Task spawned in TaskMixin implemented via task callback.""" 

52 

53 __slots__ = ( 

54 "args", 

55 "func", 

56 "ignored_exceptions", 

57 "kwargs", 

58 "max_attempts", 

59 "subscriber", 

60 ) 

61 

62 # stores hash identifier of exceptions which whose info was printed 

63 __cache: _SupervisorCache = _SupervisorCache() 

64 

65 def __init__( 

66 self, 

67 func: Callable[..., Coroutine[Any, Any, Any]], 

68 func_args: tuple[Any] | None, 

69 func_kwargs: dict[str, Any] | None, 

70 subscriber: TasksMixin, 

71 *, 

72 ignored_exceptions: tuple[type[BaseException], ...] = (CancelledError,), 

73 ) -> None: 

74 self.subscriber = subscriber 

75 self.func = func 

76 self.args = func_args or () 

77 self.kwargs = func_kwargs or {} 

78 self.ignored_exceptions = ignored_exceptions 

79 

80 @staticmethod 

81 def _get_exception_identifier( 

82 exception: BaseException, *, message_size: int = 4096 

83 ) -> int: 

84 # NOTE: method accepts only raised exceptions (e.g. with __traceback__) 

85 line, message, klass = ( 

86 getattr(exception.__traceback__, "tb_lineno", 0), 

87 str(exception)[:message_size], 

88 str(exception.__class__), 

89 ) 

90 to_hash = f"{line}-{message}-{klass}" 

91 

92 return hash(to_hash) 

93 

94 @property 

95 def is_disabled(self) -> bool: 

96 # supervisor can affect some test cases, so it might be useful to have global killswitch. 

97 return bool(int(os.getenv("FASTSTREAM_SUPERVISOR_DISABLED", "0"))) 

98 

99 def __call__(self, task: Task[Any]) -> None: 

100 logger = self.subscriber._outer_config.logger 

101 

102 logger.log( 

103 f"callback for {task.get_name()} is being executed...", 

104 log_level=logging.INFO, 

105 ) 

106 

107 if task.cancelled() or self.is_disabled: 

108 return 

109 

110 if (exc := task.exception()) and not isinstance(exc, self.ignored_exceptions): 

111 # trace is printed only once, but task is still retried 

112 identifier = self._get_exception_identifier(exc) 

113 if identifier not in self.__cache: 

114 self.__cache.add(identifier) 

115 logger.log( 

116 f"{task.get_name()} raised an exception, retrying...\n" 

117 "If this behavior causes issues, you can disable it via setting the FASTSTREAM_SUPERVISOR_DISABLED env to 1. " 

118 "Also, please consider opening issue on the repository: https://github.com/ag2ai/faststream.", 

119 exc_info=exc, 

120 log_level=logging.ERROR, 

121 ) 

122 

123 self.subscriber.add_task(self.func, self.args, self.kwargs)