Coverage for faststream / middlewares / logging.py: 95%

18 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4from faststream._internal.middlewares import BaseMiddleware 

5from faststream.exceptions import IgnoredException 

6from faststream.message.source_type import SourceType 

7 

8if TYPE_CHECKING: 

9 from types import TracebackType 

10 

11 from faststream._internal.basic_types import AsyncFuncAny 

12 from faststream._internal.context.repository import ContextRepo 

13 from faststream._internal.logger import LoggerState 

14 from faststream.message import StreamMessage 

15 

16 

17class CriticalLogMiddleware: 

18 def __init__(self, logger: "LoggerState") -> None: 

19 """Initialize the class.""" 

20 self.logger = logger 

21 

22 def __call__( 

23 self, 

24 msg: Any | None, 

25 /, 

26 *, 

27 context: "ContextRepo", 

28 ) -> "_LoggingMiddleware": 

29 return _LoggingMiddleware( 

30 logger=self.logger, 

31 msg=msg, 

32 context=context, 

33 ) 

34 

35 

36class _LoggingMiddleware(BaseMiddleware): 

37 """A middleware class for logging critical errors.""" 

38 

39 def __init__( 

40 self, 

41 *, 

42 logger: "LoggerState", 

43 context: "ContextRepo", 

44 msg: Any | None, 

45 ) -> None: 

46 super().__init__(msg, context=context) 

47 self.logger = logger 

48 self.source_type = SourceType.CONSUME 

49 

50 async def consume_scope( 

51 self, 

52 call_next: "AsyncFuncAny", 

53 msg: "StreamMessage[Any]", 

54 ) -> Any: 

55 source_type = self.source_type = msg.source_type 

56 

57 if source_type is not SourceType.RESPONSE: 

58 self.logger.log( 

59 "Received", 

60 extra=self.context.get_local("log_context", {}), 

61 ) 

62 

63 return await call_next(msg) 

64 

65 async def __aexit__( 

66 self, 

67 exc_type: type[BaseException] | None = None, 

68 exc_val: BaseException | None = None, 

69 exc_tb: Optional["TracebackType"] = None, 

70 ) -> bool: 

71 """Asynchronously called after processing.""" 

72 if self.source_type is not SourceType.RESPONSE: 72 ↛ 93line 72 didn't jump to line 93 because the condition on line 72 was always true

73 c = self.context.get_local("log_context", {}) 

74 

75 if exc_type: 

76 # TODO: move critical logging to `subscriber.consume()` method 

77 if issubclass(exc_type, IgnoredException): 

78 self.logger.log( 

79 message=str(exc_val), 

80 extra=c, 

81 ) 

82 

83 else: 

84 self.logger.log( 

85 message=f"{exc_type.__name__}: {exc_val}", 

86 log_level=logging.ERROR, 

87 exc_info=exc_val, 

88 extra=c, 

89 ) 

90 

91 self.logger.log(message="Processed", extra=c) 

92 

93 await super().__aexit__(exc_type, exc_val, exc_tb) 

94 

95 # Exception was not processed 

96 return False