Coverage for faststream / confluent / publisher / producer.py: 86%

55 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4from typing_extensions import override 

5 

6from faststream._internal.endpoint.utils import ParserComposition 

7from faststream._internal.producer import ProducerProto 

8from faststream.confluent.parser import AsyncConfluentParser 

9from faststream.confluent.response import KafkaPublishCommand 

10from faststream.exceptions import FeatureNotSupportedException 

11from faststream.message import encode_message 

12 

13from .state import EmptyProducerState, ProducerState, RealProducer 

14 

15if TYPE_CHECKING: 

16 import asyncio 

17 

18 from confluent_kafka import Message 

19 from fast_depends.library.serializer import SerializerProto 

20 

21 from faststream._internal.types import CustomCallable 

22 from faststream.confluent.helpers.client import AsyncConfluentProducer 

23 

24 

25class AsyncConfluentFastProducer(ProducerProto[KafkaPublishCommand]): 

26 """A class to represent Kafka producer.""" 

27 

28 def connect( 

29 self, 

30 producer: "AsyncConfluentProducer", 

31 serializer: Optional["SerializerProto"], 

32 ) -> None: ... 

33 

34 def __bool__(self) -> bool: 

35 return False 

36 

37 async def disconnect(self) -> None: 

38 return None 

39 

40 async def flush(self) -> None: 

41 return None 

42 

43 @abstractmethod 

44 async def ping(self, timeout: float) -> bool: 

45 return False 

46 

47 @override 

48 @abstractmethod 

49 async def publish( 

50 self, 

51 cmd: "KafkaPublishCommand", 

52 ) -> "asyncio.Future[Message | None] | Message | None": ... 

53 

54 @override 

55 @abstractmethod 

56 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: ... 

57 

58 @override 

59 async def request(self, cmd: "KafkaPublishCommand") -> Any: 

60 msg = "Kafka doesn't support `request` method without test client." 

61 raise FeatureNotSupportedException(msg) 

62 

63 

64class FakeConfluentFastProducer(AsyncConfluentFastProducer): 

65 def connect( 

66 self, 

67 producer: "AsyncConfluentProducer", 

68 serializer: Optional["SerializerProto"], 

69 ) -> None: 

70 raise NotImplementedError 

71 

72 async def disconnect(self) -> None: 

73 raise NotImplementedError 

74 

75 async def flush(self) -> None: 

76 raise NotImplementedError 

77 

78 async def ping(self, timeout: float) -> bool: 

79 raise NotImplementedError 

80 

81 @override 

82 async def publish( 

83 self, 

84 cmd: "KafkaPublishCommand", 

85 ) -> "asyncio.Future[Message | None] | Message | None": 

86 raise NotImplementedError 

87 

88 @override 

89 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: 

90 raise NotImplementedError 

91 

92 

93class AsyncConfluentFastProducerImpl(AsyncConfluentFastProducer): 

94 """A class to represent Kafka producer.""" 

95 

96 def __init__( 

97 self, 

98 parser: Optional["CustomCallable"], 

99 decoder: Optional["CustomCallable"], 

100 ) -> None: 

101 self._producer: ProducerState = EmptyProducerState() 

102 self.serializer: SerializerProto | None = None 

103 

104 # NOTE: register default parser to be compatible with request 

105 default = AsyncConfluentParser() 

106 self._parser = ParserComposition(parser, default.parse_message) 

107 self._decoder = ParserComposition(decoder, default.decode_message) 

108 

109 def connect( 

110 self, 

111 producer: "AsyncConfluentProducer", 

112 serializer: Optional["SerializerProto"], 

113 ) -> None: 

114 self._producer = RealProducer(producer) 

115 self.serializer = serializer 

116 

117 async def disconnect(self) -> None: 

118 await self._producer.stop() 

119 self._producer = EmptyProducerState() 

120 

121 def __bool__(self) -> bool: 

122 return bool(self._producer) 

123 

124 async def ping(self, timeout: float) -> bool: 

125 return await self._producer.ping(timeout=timeout) 

126 

127 async def flush(self) -> None: 

128 await self._producer.flush() 

129 

130 @override 

131 async def publish( 

132 self, 

133 cmd: "KafkaPublishCommand", 

134 ) -> "asyncio.Future[Message | None] | Message | None": 

135 """Publish a message to a topic.""" 

136 message, content_type = encode_message(cmd.body, serializer=self.serializer) 

137 

138 headers_to_send = { 

139 "content-type": content_type or "", 

140 **cmd.headers_to_publish(), 

141 } 

142 

143 return await self._producer.producer.send( 

144 topic=cmd.destination, 

145 value=message, 

146 key=cmd.key, 

147 partition=cmd.partition, 

148 timestamp_ms=cmd.timestamp_ms, 

149 headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()], 

150 no_confirm=cmd.no_confirm, 

151 ) 

152 

153 @override 

154 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: 

155 """Publish a batch of messages to a topic.""" 

156 batch = self._producer.producer.create_batch() 

157 

158 headers_to_send = cmd.headers_to_publish() 

159 

160 for message_position, msg in enumerate(cmd.batch_bodies): 

161 message, content_type = encode_message(msg, serializer=self.serializer) 

162 

163 if content_type: 163 ↛ 169line 163 didn't jump to line 169 because the condition on line 163 was always true

164 final_headers = { 

165 "content-type": content_type, 

166 **headers_to_send, 

167 } 

168 else: 

169 final_headers = headers_to_send.copy() 

170 

171 batch.append( 

172 key=cmd.key_for(message_position), 

173 value=message, 

174 timestamp=cmd.timestamp_ms, 

175 headers=[(i, j.encode()) for i, j in final_headers.items()], 

176 ) 

177 

178 await self._producer.producer.send_batch( 

179 batch, 

180 cmd.destination, 

181 partition=cmd.partition, 

182 no_confirm=cmd.no_confirm, 

183 )