Coverage for faststream / nats / broker / logging.py: 95%
17 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from functools import partial
3from typing import TYPE_CHECKING, Any
5from faststream._internal.logger import DefaultLoggerStorage, make_logger_state
6from faststream._internal.logger.logging import get_broker_logger
8if TYPE_CHECKING:
9 from faststream._internal.basic_types import LoggerProto
10 from faststream._internal.context import ContextRepo
13class NatsParamsStorage(DefaultLoggerStorage):
14 def __init__(self) -> None:
15 super().__init__()
17 self._max_queue_len = 0
18 self._max_stream_len = 0
19 self._max_subject_len = 4
21 self.logger_log_level = logging.INFO
23 def set_level(self, level: int) -> None:
24 self.logger_log_level = level
26 def register_subscriber(self, params: dict[str, Any]) -> None:
27 self._max_subject_len = max(
28 (
29 self._max_subject_len,
30 len(params.get("subject", "")),
31 ),
32 )
33 self._max_queue_len = max(
34 (
35 self._max_queue_len,
36 len(params.get("queue", "")),
37 ),
38 )
39 self._max_stream_len = max(
40 (
41 self._max_stream_len,
42 len(params.get("stream", "")),
43 ),
44 )
46 def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
47 # TODO: generate unique logger names to not share between brokers
48 if not (lg := self._get_logger_ref()): 48 ↛ 80line 48 didn't jump to line 80 because the condition on line 48 was always true
49 message_id_ln = 10
51 lg = get_broker_logger(
52 name="nats",
53 default_context={
54 "subject": "",
55 "stream": "",
56 "queue": "",
57 },
58 message_id_ln=message_id_ln,
59 fmt="".join((
60 "%(asctime)s %(levelname)-8s - ",
61 (
62 f"%(stream)-{self._max_stream_len}s | "
63 if self._max_stream_len
64 else ""
65 ),
66 (
67 f"%(queue)-{self._max_queue_len}s | "
68 if self._max_queue_len
69 else ""
70 ),
71 f"%(subject)-{self._max_subject_len}s | ",
72 f"%(message_id)-{message_id_ln}s - ",
73 "%(message)s",
74 )),
75 context=context,
76 log_level=self.logger_log_level,
77 )
78 self._logger_ref.add(lg)
80 return lg
83make_nats_logger_state = partial(
84 make_logger_state,
85 default_storage_cls=NatsParamsStorage,
86)