Coverage for faststream / mqtt / broker / logging.py: 94%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from functools import partial
3from typing import TYPE_CHECKING, Any
5from faststream._internal.logger import DefaultLoggerStorage, make_logger_state
6from faststream._internal.logger.logging import get_broker_logger
8if TYPE_CHECKING:
9 from faststream._internal.basic_types import LoggerProto
10 from faststream._internal.context import ContextRepo
13class MQTTParamsStorage(DefaultLoggerStorage):
14 def __init__(self) -> None:
15 super().__init__()
17 self._max_topic_len = 4
18 self._max_shared_len = 0
19 self.logger_log_level = logging.INFO
21 def set_level(self, level: int) -> None:
22 self.logger_log_level = level
24 def register_subscriber(self, params: dict[str, Any]) -> None:
25 self._max_topic_len = max(
26 self._max_topic_len,
27 len(params.get("topic", "")),
28 )
29 self._max_shared_len = max(
30 self._max_shared_len,
31 len(params.get("shared", "") or ""),
32 )
34 def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
35 if not (lg := self._get_logger_ref()): 35 ↛ 61line 35 didn't jump to line 61 because the condition on line 35 was always true
36 message_id_ln = 10
38 lg = get_broker_logger(
39 name="mqtt",
40 default_context={
41 "topic": "",
42 "shared": "",
43 },
44 message_id_ln=message_id_ln,
45 fmt="".join((
46 "%(asctime)s %(levelname)-8s - ",
47 (
48 f"%(shared)-{self._max_shared_len}s | "
49 if self._max_shared_len
50 else ""
51 ),
52 f"%(topic)-{self._max_topic_len}s | ",
53 f"%(message_id)-{message_id_ln}s - ",
54 "%(message)s",
55 )),
56 context=context,
57 log_level=self.logger_log_level,
58 )
59 self._logger_ref.add(lg)
61 return lg
64make_mqtt_logger_state = partial(
65 make_logger_state,
66 default_storage_cls=MQTTParamsStorage,
67)