Coverage for faststream / kafka / helpers / rebalance_listener.py: 87%

32 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2import logging 

3from typing import TYPE_CHECKING, Any, Optional 

4 

5from aiokafka import ConsumerRebalanceListener 

6 

7from faststream._internal.utils.functions import call_or_await 

8 

9if TYPE_CHECKING: 

10 from aiokafka import AIOKafkaConsumer, TopicPartition 

11 

12 from faststream._internal.basic_types import LoggerProto 

13 

14 

15def make_logging_listener( 

16 *, 

17 consumer: "AIOKafkaConsumer", 

18 logger: Optional["LoggerProto"], 

19 log_extra: dict[str, Any], 

20 listener: Optional["ConsumerRebalanceListener"], 

21) -> Optional["ConsumerRebalanceListener"]: 

22 if logger is None: 22 ↛ 23line 22 didn't jump to line 23 because the condition on line 22 was never true

23 return listener 

24 

25 logging_listener = _LoggingListener( 

26 consumer=consumer, 

27 logger=logger, 

28 log_extra=log_extra, 

29 ) 

30 if listener is None: 

31 return logging_listener 

32 

33 return _LoggingListenerFacade( 

34 logging_listener=logging_listener, 

35 listener=listener, 

36 ) 

37 

38 

39class _LoggingListener(ConsumerRebalanceListener): # type: ignore[misc] 

40 _log_unassigned_consumer_delay_seconds = 60 * 2 

41 _log_unassigned_consumer_task: asyncio.Task[None] | None 

42 

43 def __init__( 

44 self, 

45 *, 

46 consumer: "AIOKafkaConsumer", 

47 logger: "LoggerProto", 

48 log_extra: dict[str, Any], 

49 ) -> None: 

50 self.consumer = consumer 

51 self.logger = logger 

52 self.log_extra = log_extra 

53 self._log_unassigned_consumer_task = None 

54 

55 async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None: 

56 pass 

57 

58 async def log_unassigned_consumer(self) -> None: 

59 await asyncio.sleep(self._log_unassigned_consumer_delay_seconds) 

60 self.logger.log( 

61 logging.WARNING, 

62 f"Consumer in group {self.consumer._group_id} has had no partition " 

63 f"assignments for {self._log_unassigned_consumer_delay_seconds} seconds: " 

64 f"topics {self.consumer._subscription.topics} may have fewer partitions " 

65 f"than consumers.", 

66 extra=self.log_extra, 

67 ) 

68 

69 async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None: 

70 self.logger.log( 

71 logging.INFO, 

72 f"Consumer {self.consumer._coordinator.member_id} assigned to partitions: " 

73 f"({', '.join(str(tp) for tp in assigned)})", 

74 extra=self.log_extra, 

75 ) 

76 

77 if not assigned: 

78 self.logger.log( 

79 logging.WARNING, 

80 f"Consumer in group {self.consumer._group_id} has no partition assignments - this " 

81 f"could be temporary, e.g. during a rolling update. A separate warning will be logged if " 

82 f"this condition persists for {self._log_unassigned_consumer_delay_seconds} seconds.", 

83 extra=self.log_extra, 

84 ) 

85 

86 self._log_unassigned_consumer_task: asyncio.Task[None] | None = ( 

87 asyncio.create_task(self.log_unassigned_consumer()) 

88 ) 

89 

90 elif self._log_unassigned_consumer_task: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true

91 self._log_unassigned_consumer_task.cancel() 

92 self._log_unassigned_consumer_task = None 

93 

94 

95class _LoggingListenerFacade(ConsumerRebalanceListener): # type: ignore[misc] 

96 def __init__( 

97 self, 

98 *, 

99 logging_listener: _LoggingListener, 

100 listener: ConsumerRebalanceListener, 

101 ) -> None: 

102 self.logging_listener = logging_listener 

103 self.listener = listener 

104 

105 async def on_partitions_revoked(self, revoked: set["TopicPartition"]) -> None: 

106 await call_or_await(self.listener.on_partitions_revoked, revoked) 

107 

108 async def on_partitions_assigned(self, assigned: set["TopicPartition"]) -> None: 

109 await self.logging_listener.on_partitions_revoked(assigned) 

110 await call_or_await(self.listener.on_partitions_assigned, assigned)