Coverage for faststream / _internal / logger / params_storage.py: 88%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from abc import abstractmethod 

3from typing import TYPE_CHECKING, Any, Optional, Protocol 

4from weakref import WeakSet 

5 

6from faststream._internal.constants import EMPTY 

7 

8if TYPE_CHECKING: 

9 from faststream._internal.basic_types import LoggerProto 

10 from faststream._internal.context import ContextRepo 

11 

12 

13def make_logger_storage( 

14 logger: Optional["LoggerProto"], 

15 default_storage_cls: type["DefaultLoggerStorage"], 

16) -> "LoggerParamsStorage": 

17 if logger is EMPTY: 

18 return default_storage_cls() 

19 

20 return EmptyLoggerStorage() if logger is None else ManualLoggerStorage(logger) 

21 

22 

23class LoggerParamsStorage(Protocol): 

24 def register_subscriber(self, params: dict[str, Any]) -> None: ... 

25 

26 def get_logger(self, *, context: "ContextRepo") -> Optional["LoggerProto"]: ... 

27 

28 def set_level(self, level: int) -> None: ... 

29 

30 

31class EmptyLoggerStorage(LoggerParamsStorage): 

32 def register_subscriber(self, params: dict[str, Any]) -> None: 

33 pass 

34 

35 def get_logger(self, *, context: "ContextRepo") -> None: 

36 return None 

37 

38 def set_level(self, level: int) -> None: 

39 pass 

40 

41 

42class ManualLoggerStorage(LoggerParamsStorage): 

43 def __init__(self, logger: "LoggerProto") -> None: 

44 self.__logger = logger 

45 

46 def register_subscriber(self, params: dict[str, Any]) -> None: 

47 pass 

48 

49 def get_logger(self, *, context: "ContextRepo") -> "LoggerProto": 

50 return self.__logger 

51 

52 def set_level(self, level: int) -> None: 

53 if getattr(self.__logger, "setLevel", None): 

54 self.__logger.setLevel(level) # type: ignore[attr-defined] 

55 

56 

57class DefaultLoggerStorage(LoggerParamsStorage): 

58 def __init__(self) -> None: 

59 # will be used to build logger in `get_logger` method 

60 self.logger_log_level = logging.INFO 

61 

62 self._logger_ref = WeakSet[logging.Logger]() 

63 

64 @abstractmethod 

65 def get_logger(self, *, context: "ContextRepo") -> "LoggerProto": 

66 raise NotImplementedError 

67 

68 def _get_logger_ref(self) -> logging.Logger | None: 

69 if self._logger_ref: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true

70 return next(iter(self._logger_ref)) 

71 

72 return None 

73 

74 def set_level(self, level: int) -> None: 

75 if lg := self._get_logger_ref(): 

76 lg.setLevel(level) 

77 

78 self.logger_log_level = level