Coverage for faststream / confluent / publisher / producer.py: 86%
55 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from typing import TYPE_CHECKING, Any, Optional
4from typing_extensions import override
6from faststream._internal.endpoint.utils import ParserComposition
7from faststream._internal.producer import ProducerProto
8from faststream.confluent.parser import AsyncConfluentParser
9from faststream.confluent.response import KafkaPublishCommand
10from faststream.exceptions import FeatureNotSupportedException
11from faststream.message import encode_message
13from .state import EmptyProducerState, ProducerState, RealProducer
15if TYPE_CHECKING:
16 import asyncio
18 from confluent_kafka import Message
19 from fast_depends.library.serializer import SerializerProto
21 from faststream._internal.types import CustomCallable
22 from faststream.confluent.helpers.client import AsyncConfluentProducer
25class AsyncConfluentFastProducer(ProducerProto[KafkaPublishCommand]):
26 """A class to represent Kafka producer."""
28 def connect(
29 self,
30 producer: "AsyncConfluentProducer",
31 serializer: Optional["SerializerProto"],
32 ) -> None: ...
34 def __bool__(self) -> bool:
35 return False
37 async def disconnect(self) -> None:
38 return None
40 async def flush(self) -> None:
41 return None
43 @abstractmethod
44 async def ping(self, timeout: float) -> bool:
45 return False
47 @override
48 @abstractmethod
49 async def publish(
50 self,
51 cmd: "KafkaPublishCommand",
52 ) -> "asyncio.Future[Message | None] | Message | None": ...
54 @override
55 @abstractmethod
56 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: ...
58 @override
59 async def request(self, cmd: "KafkaPublishCommand") -> Any:
60 msg = "Kafka doesn't support `request` method without test client."
61 raise FeatureNotSupportedException(msg)
64class FakeConfluentFastProducer(AsyncConfluentFastProducer):
65 def connect(
66 self,
67 producer: "AsyncConfluentProducer",
68 serializer: Optional["SerializerProto"],
69 ) -> None:
70 raise NotImplementedError
72 async def disconnect(self) -> None:
73 raise NotImplementedError
75 async def flush(self) -> None:
76 raise NotImplementedError
78 async def ping(self, timeout: float) -> bool:
79 raise NotImplementedError
81 @override
82 async def publish(
83 self,
84 cmd: "KafkaPublishCommand",
85 ) -> "asyncio.Future[Message | None] | Message | None":
86 raise NotImplementedError
88 @override
89 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
90 raise NotImplementedError
93class AsyncConfluentFastProducerImpl(AsyncConfluentFastProducer):
94 """A class to represent Kafka producer."""
96 def __init__(
97 self,
98 parser: Optional["CustomCallable"],
99 decoder: Optional["CustomCallable"],
100 ) -> None:
101 self._producer: ProducerState = EmptyProducerState()
102 self.serializer: SerializerProto | None = None
104 # NOTE: register default parser to be compatible with request
105 default = AsyncConfluentParser()
106 self._parser = ParserComposition(parser, default.parse_message)
107 self._decoder = ParserComposition(decoder, default.decode_message)
109 def connect(
110 self,
111 producer: "AsyncConfluentProducer",
112 serializer: Optional["SerializerProto"],
113 ) -> None:
114 self._producer = RealProducer(producer)
115 self.serializer = serializer
117 async def disconnect(self) -> None:
118 await self._producer.stop()
119 self._producer = EmptyProducerState()
121 def __bool__(self) -> bool:
122 return bool(self._producer)
124 async def ping(self, timeout: float) -> bool:
125 return await self._producer.ping(timeout=timeout)
127 async def flush(self) -> None:
128 await self._producer.flush()
130 @override
131 async def publish(
132 self,
133 cmd: "KafkaPublishCommand",
134 ) -> "asyncio.Future[Message | None] | Message | None":
135 """Publish a message to a topic."""
136 message, content_type = encode_message(cmd.body, serializer=self.serializer)
138 headers_to_send = {
139 "content-type": content_type or "",
140 **cmd.headers_to_publish(),
141 }
143 return await self._producer.producer.send(
144 topic=cmd.destination,
145 value=message,
146 key=cmd.key,
147 partition=cmd.partition,
148 timestamp_ms=cmd.timestamp_ms,
149 headers=[(i, (j or "").encode()) for i, j in headers_to_send.items()],
150 no_confirm=cmd.no_confirm,
151 )
153 @override
154 async def publish_batch(self, cmd: "KafkaPublishCommand") -> None:
155 """Publish a batch of messages to a topic."""
156 batch = self._producer.producer.create_batch()
158 headers_to_send = cmd.headers_to_publish()
160 for message_position, msg in enumerate(cmd.batch_bodies):
161 message, content_type = encode_message(msg, serializer=self.serializer)
163 if content_type: 163 ↛ 169line 163 didn't jump to line 169 because the condition on line 163 was always true
164 final_headers = {
165 "content-type": content_type,
166 **headers_to_send,
167 }
168 else:
169 final_headers = headers_to_send.copy()
171 batch.append(
172 key=cmd.key_for(message_position),
173 value=message,
174 timestamp=cmd.timestamp_ms,
175 headers=[(i, j.encode()) for i, j in final_headers.items()],
176 )
178 await self._producer.producer.send_batch(
179 batch,
180 cmd.destination,
181 partition=cmd.partition,
182 no_confirm=cmd.no_confirm,
183 )