Coverage for faststream / kafka / publisher / producer.py: 86%
66 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from typing import TYPE_CHECKING, Any, Optional, Union
4from typing_extensions import override
6from faststream._internal.endpoint.utils import ParserComposition
7from faststream._internal.producer import ProducerProto
8from faststream.exceptions import FeatureNotSupportedException
9from faststream.kafka.exceptions import BatchBufferOverflowException
10from faststream.kafka.message import KafkaMessage
11from faststream.kafka.parser import AioKafkaParser
12from faststream.kafka.response import KafkaPublishCommand
13from faststream.message import encode_message
15from .state import EmptyProducerState, ProducerState, RealProducer
17if TYPE_CHECKING:
18 import asyncio
20 from aiokafka import AIOKafkaProducer
21 from aiokafka.structs import RecordMetadata
22 from fast_depends.library.serializer import SerializerProto
24 from faststream._internal.types import CustomCallable
27class AioKafkaFastProducer(ProducerProto[KafkaPublishCommand]):
28 async def connect(
29 self,
30 producer: "AIOKafkaProducer",
31 serializer: Optional["SerializerProto"],
32 ) -> None: ...
34 async def disconnect(self) -> None: ...
36 def __bool__(self) -> bool:
37 return False
39 @property
40 def closed(self) -> bool:
41 return True
43 async def flush(self) -> None:
44 return None
46 @abstractmethod
47 async def publish(
48 self,
49 cmd: "KafkaPublishCommand",
50 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ...
52 @abstractmethod
53 async def publish_batch(
54 self,
55 cmd: "KafkaPublishCommand",
56 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: ...
58 async def request(self, cmd: "KafkaPublishCommand") -> Any:
59 msg = "Kafka doesn't support `request` method without test client."
60 raise FeatureNotSupportedException(msg)
63class AioKafkaFastProducerImpl(AioKafkaFastProducer):
64 """A class to represent Kafka producer."""
66 def __init__(
67 self,
68 parser: Optional["CustomCallable"],
69 decoder: Optional["CustomCallable"],
70 ) -> None:
71 self._producer: ProducerState = EmptyProducerState()
72 self.serializer: SerializerProto | None = None
74 # NOTE: register default parser to be compatible with request
75 default = AioKafkaParser(msg_class=KafkaMessage, regex=None)
76 self._parser = ParserComposition(parser, default.parse_message)
77 self._decoder = ParserComposition(decoder, default.decode_message)
79 async def connect(
80 self,
81 producer: "AIOKafkaProducer",
82 serializer: Optional["SerializerProto"],
83 ) -> None:
84 self.serializer = serializer
85 await producer.start()
86 self._producer = RealProducer(producer)
88 async def disconnect(self) -> None:
89 await self._producer.stop()
90 self._producer = EmptyProducerState()
92 def __bool__(self) -> bool:
93 return bool(self._producer)
95 @property
96 def closed(self) -> bool:
97 return self._producer.closed
99 async def flush(self) -> None:
100 await self._producer.flush()
102 @override
103 async def publish(
104 self,
105 cmd: "KafkaPublishCommand",
106 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
107 """Publish a message to a topic."""
108 message, content_type = encode_message(cmd.body, serializer=self.serializer)
110 headers_to_send = {
111 "content-type": content_type or "",
112 **cmd.headers_to_publish(),
113 }
115 send_future = await self._producer.producer.send(
116 topic=cmd.destination,
117 value=message,
118 key=cmd.key,
119 partition=cmd.partition,
120 timestamp_ms=cmd.timestamp_ms,
121 headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
122 )
124 if not cmd.no_confirm:
125 return await send_future
126 return send_future
128 @override
129 async def publish_batch(
130 self,
131 cmd: "KafkaPublishCommand",
132 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
133 """Publish a batch of messages to a topic."""
134 batch = self._producer.producer.create_batch()
136 headers_to_send = cmd.headers_to_publish()
138 for message_position, body in enumerate(cmd.batch_bodies):
139 message, content_type = encode_message(body, serializer=self.serializer)
141 if content_type: 141 ↛ 147line 141 didn't jump to line 147 because the condition on line 141 was always true
142 final_headers = {
143 "content-type": content_type,
144 **headers_to_send,
145 }
146 else:
147 final_headers = headers_to_send.copy()
149 metadata = batch.append(
150 key=cmd.key_for(message_position),
151 value=message,
152 timestamp=cmd.timestamp_ms,
153 headers=[(i, j.encode()) for i, j in final_headers.items()],
154 )
155 if metadata is None:
156 raise BatchBufferOverflowException(message_position=message_position)
158 send_future = await self._producer.producer.send_batch(
159 batch,
160 cmd.destination,
161 partition=cmd.partition,
162 )
163 if not cmd.no_confirm:
164 return await send_future
165 return send_future
168class FakeAioKafkaFastProducer(AioKafkaFastProducer):
169 async def connect(
170 self,
171 producer: "AIOKafkaProducer",
172 serializer: Optional["SerializerProto"],
173 ) -> None:
174 raise NotImplementedError
176 async def disconnect(self) -> None:
177 raise NotImplementedError
179 def __bool__(self) -> bool:
180 return False
182 @property
183 def closed(self) -> bool:
184 raise NotImplementedError
186 async def flush(self) -> None:
187 raise NotImplementedError
189 async def publish(
190 self,
191 cmd: "KafkaPublishCommand",
192 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
193 raise NotImplementedError
195 async def publish_batch(
196 self,
197 cmd: "KafkaPublishCommand",
198 ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]:
199 raise NotImplementedError