Coverage for faststream / kafka / helpers / rebalance_listener.py: 87%
32 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import logging
3from typing import TYPE_CHECKING, Any, Optional
5from aiokafka import ConsumerRebalanceListener
7from faststream._internal.utils.functions import call_or_await
9if TYPE_CHECKING:
10 from aiokafka import AIOKafkaConsumer, TopicPartition
12 from faststream._internal.basic_types import LoggerProto
15def make_logging_listener(
16 *,
17 consumer: "AIOKafkaConsumer",
18 logger: Optional["LoggerProto"],
19 log_extra: dict[str, Any],
20 listener: Optional["ConsumerRebalanceListener"],
21) -> Optional["ConsumerRebalanceListener"]:
22 if logger is None: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true
23 return listener
25 logging_listener = _LoggingListener(
26 consumer=consumer,
27 logger=logger,
28 log_extra=log_extra,
29 )
30 if listener is None:
31 return logging_listener
33 return _LoggingListenerFacade(
34 logging_listener=logging_listener,
35 listener=listener,
36 )
39class _LoggingListener(ConsumerRebalanceListener): # type: ignore[misc]
40 _log_unassigned_consumer_delay_seconds = 60 * 2
41 _log_unassigned_consumer_task: asyncio.Task[None] | None
43 def __init__(
44 self,
45 *,
46 consumer: "AIOKafkaConsumer",
47 logger: "LoggerProto",
48 log_extra: dict[str, Any],
49 ) -> None:
50 self.consumer = consumer
51 self.logger = logger
52 self.log_extra = log_extra
53 self._log_unassigned_consumer_task = None
55 async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None:
56 pass
58 async def log_unassigned_consumer(self) -> None:
59 await asyncio.sleep(self._log_unassigned_consumer_delay_seconds)
60 self.logger.log(
61 logging.WARNING,
62 f"Consumer in group {self.consumer._group_id} has had no partition "
63 f"assignments for {self._log_unassigned_consumer_delay_seconds} seconds: "
64 f"topics {self.consumer._subscription.topics} may have fewer partitions "
65 f"than consumers.",
66 extra=self.log_extra,
67 )
69 async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None:
70 self.logger.log(
71 logging.INFO,
72 f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: "
73 f"({', '.join(str(tp) for tp in assigned)})",
74 extra=self.log_extra,
75 )
77 if not assigned:
78 self.logger.log(
79 logging.WARNING,
80 f"Consumer in group {self.consumer._group_id} has no partition assignments - this "
81 f"could be temporary, e.g. during a rolling update. A separate warning will be logged if "
82 f"this condition persists for {self._log_unassigned_consumer_delay_seconds} seconds.",
83 extra=self.log_extra,
84 )
86 self._log_unassigned_consumer_task: asyncio.Task[None] | None = (
87 asyncio.create_task(self.log_unassigned_consumer())
88 )
90 elif self._log_unassigned_consumer_task: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true
91 self._log_unassigned_consumer_task.cancel()
92 self._log_unassigned_consumer_task = None
95class _LoggingListenerFacade(ConsumerRebalanceListener): # type: ignore[misc]
96 def __init__(
97 self,
98 *,
99 logging_listener: _LoggingListener,
100 listener: ConsumerRebalanceListener,
101 ) -> None:
102 self.logging_listener = logging_listener
103 self.listener = listener
105 async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None:
106 await call_or_await(self.listener.on_partitions_revoked, revoked)
108 async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None:
109 await self.logging_listener.on_partitions_revoked(assigned)
110 await call_or_await(self.listener.on_partitions_assigned, assigned)