Coverage for faststream / kafka / publisher / producer.py: 86%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import TYPE_CHECKING, Any, Optional, Union 

3 

4from typing_extensions import override 

5 

6from faststream._internal.endpoint.utils import ParserComposition 

7from faststream._internal.producer import ProducerProto 

8from faststream.exceptions import FeatureNotSupportedException 

9from faststream.kafka.exceptions import BatchBufferOverflowException 

10from faststream.kafka.message import KafkaMessage 

11from faststream.kafka.parser import AioKafkaParser 

12from faststream.kafka.response import KafkaPublishCommand 

13from faststream.message import encode_message 

14 

15from .state import EmptyProducerState, ProducerState, RealProducer 

16 

17if TYPE_CHECKING: 

18 import asyncio 

19 

20 from aiokafka import AIOKafkaProducer 

21 from aiokafka.structs import RecordMetadata 

22 from fast_depends.library.serializer import SerializerProto 

23 

24 from faststream._internal.types import CustomCallable 

25 

26 

27class AioKafkaFastProducer(ProducerProto[KafkaPublishCommand]): 

28 async def connect( 

29 self, 

30 producer: "AIOKafkaProducer", 

31 serializer: Optional["SerializerProto"], 

32 ) -> None: ... 

33 

34 async def disconnect(self) -> None: ... 

35 

36 def __bool__(self) -> bool: 

37 return False 

38 

39 @property 

40 def closed(self) -> bool: 

41 return True 

42 

43 async def flush(self) -> None: 

44 return None 

45 

46 @abstractmethod 

47 async def publish( 

48 self, 

49 cmd: "KafkaPublishCommand", 

50 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ... 

51 

52 @abstractmethod 

53 async def publish_batch( 

54 self, 

55 cmd: "KafkaPublishCommand", 

56 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ... 

57 

58 async def request(self, cmd: "KafkaPublishCommand") -> Any: 

59 msg = "Kafka doesn't support `request` method without test client." 

60 raise FeatureNotSupportedException(msg) 

61 

62 

63class AioKafkaFastProducerImpl(AioKafkaFastProducer): 

64 """A class to represent Kafka producer.""" 

65 

66 def __init__( 

67 self, 

68 parser: Optional["CustomCallable"], 

69 decoder: Optional["CustomCallable"], 

70 ) -> None: 

71 self._producer: ProducerState = EmptyProducerState() 

72 self.serializer: SerializerProto | None = None 

73 

74 # NOTE: register default parser to be compatible with request 

75 default = AioKafkaParser(msg_class=KafkaMessage, regex=None) 

76 self._parser = ParserComposition(parser, default.parse_message) 

77 self._decoder = ParserComposition(decoder, default.decode_message) 

78 

79 async def connect( 

80 self, 

81 producer: "AIOKafkaProducer", 

82 serializer: Optional["SerializerProto"], 

83 ) -> None: 

84 self.serializer = serializer 

85 await producer.start() 

86 self._producer = RealProducer(producer) 

87 

88 async def disconnect(self) -> None: 

89 await self._producer.stop() 

90 self._producer = EmptyProducerState() 

91 

92 def __bool__(self) -> bool: 

93 return bool(self._producer) 

94 

95 @property 

96 def closed(self) -> bool: 

97 return self._producer.closed 

98 

99 async def flush(self) -> None: 

100 await self._producer.flush() 

101 

102 @override 

103 async def publish( 

104 self, 

105 cmd: "KafkaPublishCommand", 

106 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

107 """Publish a message to a topic.""" 

108 message, content_type = encode_message(cmd.body, serializer=self.serializer) 

109 

110 headers_to_send = { 

111 "content-type": content_type or "", 

112 **cmd.headers_to_publish(), 

113 } 

114 

115 send_future = await self._producer.producer.send( 

116 topic=cmd.destination, 

117 value=message, 

118 key=cmd.key, 

119 partition=cmd.partition, 

120 timestamp_ms=cmd.timestamp_ms, 

121 headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()], 

122 ) 

123 

124 if not cmd.no_confirm: 

125 return await send_future 

126 return send_future 

127 

128 @override 

129 async def publish_batch( 

130 self, 

131 cmd: "KafkaPublishCommand", 

132 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

133 """Publish a batch of messages to a topic.""" 

134 batch = self._producer.producer.create_batch() 

135 

136 headers_to_send = cmd.headers_to_publish() 

137 

138 for message_position, body in enumerate(cmd.batch_bodies): 

139 message, content_type = encode_message(body, serializer=self.serializer) 

140 

141 if content_type: 141 ↛ 147line 141 didn't jump to line 147 because the condition on line 141 was always true

142 final_headers = { 

143 "content-type": content_type, 

144 **headers_to_send, 

145 } 

146 else: 

147 final_headers = headers_to_send.copy() 

148 

149 metadata = batch.append( 

150 key=cmd.key_for(message_position), 

151 value=message, 

152 timestamp=cmd.timestamp_ms, 

153 headers=[(i, j.encode()) for i, j in final_headers.items()], 

154 ) 

155 if metadata is None: 

156 raise BatchBufferOverflowException(message_position=message_position) 

157 

158 send_future = await self._producer.producer.send_batch( 

159 batch, 

160 cmd.destination, 

161 partition=cmd.partition, 

162 ) 

163 if not cmd.no_confirm: 

164 return await send_future 

165 return send_future 

166 

167 

168class FakeAioKafkaFastProducer(AioKafkaFastProducer): 

169 async def connect( 

170 self, 

171 producer: "AIOKafkaProducer", 

172 serializer: Optional["SerializerProto"], 

173 ) -> None: 

174 raise NotImplementedError 

175 

176 async def disconnect(self) -> None: 

177 raise NotImplementedError 

178 

179 def __bool__(self) -> bool: 

180 return False 

181 

182 @property 

183 def closed(self) -> bool: 

184 raise NotImplementedError 

185 

186 async def flush(self) -> None: 

187 raise NotImplementedError 

188 

189 async def publish( 

190 self, 

191 cmd: "KafkaPublishCommand", 

192 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

193 raise NotImplementedError 

194 

195 async def publish_batch( 

196 self, 

197 cmd: "KafkaPublishCommand", 

198 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: 

199 raise NotImplementedError