Coverage for faststream / middlewares / logging.py: 95%
18 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from typing import TYPE_CHECKING, Any, Optional
4from faststream._internal.middlewares import BaseMiddleware
5from faststream.exceptions import IgnoredException
6from faststream.message.source_type import SourceType
8if TYPE_CHECKING:
9 from types import TracebackType
11 from faststream._internal.basic_types import AsyncFuncAny
12 from faststream._internal.context.repository import ContextRepo
13 from faststream._internal.logger import LoggerState
14 from faststream.message import StreamMessage
17class CriticalLogMiddleware:
18 def __init__(self, logger: "LoggerState") -> None:
19 """Initialize the class."""
20 self.logger = logger
22 def __call__(
23 self,
24 msg: Any | None,
25 /,
26 *,
27 context: "ContextRepo",
28 ) -> "_LoggingMiddleware":
29 return _LoggingMiddleware(
30 logger=self.logger,
31 msg=msg,
32 context=context,
33 )
36class _LoggingMiddleware(BaseMiddleware):
37 """A middleware class for logging critical errors."""
39 def __init__(
40 self,
41 *,
42 logger: "LoggerState",
43 context: "ContextRepo",
44 msg: Any | None,
45 ) -> None:
46 super().__init__(msg, context=context)
47 self.logger = logger
48 self.source_type = SourceType.CONSUME
50 async def consume_scope(
51 self,
52 call_next: "AsyncFuncAny",
53 msg: "StreamMessage[Any]",
54 ) -> Any:
55 source_type = self.source_type = msg.source_type
57 if source_type is not SourceType.RESPONSE:
58 self.logger.log(
59 "Received",
60 extra=self.context.get_local("log_context", {}),
61 )
63 return await call_next(msg)
65 async def __aexit__(
66 self,
67 exc_type: type[BaseException] | None = None,
68 exc_val: BaseException | None = None,
69 exc_tb: Optional["TracebackType"] = None,
70 ) -> bool:
71 """Asynchronously called after processing."""
72 if self.source_type is not SourceType.RESPONSE: 72 ↛ 93line 72 didn't jump to line 93 because the condition on line 72 was always true
73 c = self.context.get_local("log_context", {})
75 if exc_type:
76 # TODO: move critical logging to `subscriber.consume()` method
77 if issubclass(exc_type, IgnoredException):
78 self.logger.log(
79 message=str(exc_val),
80 extra=c,
81 )
83 else:
84 self.logger.log(
85 message=f"{exc_type.__name__}: {exc_val}",
86 log_level=logging.ERROR,
87 exc_info=exc_val,
88 extra=c,
89 )
91 self.logger.log(message="Processed", extra=c)
93 await super().__aexit__(exc_type, exc_val, exc_tb)
95 # Exception was not processed
96 return False