Coverage for tests / brokers / kafka / test_test_client.py: 99%
167 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import AsyncMock, patch
4import pytest
6from faststream import AckPolicy, BaseMiddleware
7from faststream.kafka import TopicPartition
8from faststream.kafka.annotations import KafkaMessage
9from faststream.kafka.message import FAKE_CONSUMER
10from faststream.kafka.testing import FakeProducer
11from tests.brokers.base.testclient import BrokerTestclientTestcase
12from tests.tools import spy_decorator
14from .basic import KafkaMemoryTestcaseConfig
17@pytest.mark.kafka()
18@pytest.mark.asyncio()
19class TestTestclient(KafkaMemoryTestcaseConfig, BrokerTestclientTestcase):
20 async def test_partition_match(
21 self,
22 queue: str,
23 ) -> None:
24 broker = self.get_broker()
26 @broker.subscriber(partitions=[TopicPartition(queue, 1)])
27 async def m(msg) -> None:
28 pass
30 async with self.patch_broker(broker) as br:
31 await br.publish("hello", queue)
33 m.mock.assert_called_once_with("hello")
35 async def test_partition_match_exect(
36 self,
37 queue: str,
38 ) -> None:
39 broker = self.get_broker()
41 @broker.subscriber(partitions=[TopicPartition(queue, 1)])
42 async def m(msg) -> None:
43 pass
45 async with self.patch_broker(broker) as br:
46 await br.publish("hello", queue, partition=1)
48 m.mock.assert_called_once_with("hello")
50 async def test_partition_mismatch(
51 self,
52 queue: str,
53 ) -> None:
54 broker = self.get_broker()
56 @broker.subscriber(partitions=[TopicPartition(queue, 1)])
57 async def m(msg) -> None:
58 pass
60 @broker.subscriber(queue)
61 async def m2(msg) -> None:
62 pass
64 async with self.patch_broker(broker) as br:
65 await br.publish("hello", queue, partition=2)
67 assert not m.mock.called
68 m2.mock.assert_called_once_with("hello")
70 async def test_message_nack_seek(
71 self,
72 queue: str,
73 ) -> None:
74 broker = self.get_broker(apply_types=True)
76 @broker.subscriber(queue, group_id=f"{queue}1", ack_policy=AckPolicy.MANUAL)
77 async def m(msg: KafkaMessage) -> None:
78 await msg.nack()
80 async with self.patch_broker(broker) as br:
81 with patch.object(
82 FAKE_CONSUMER,
83 "seek",
84 spy_decorator(FAKE_CONSUMER.seek),
85 ) as mocked:
86 await br.publish("hello", queue)
87 mocked.mock.assert_called_once()
89 async def test_publisher_autoflush_mock(
90 self,
91 queue: str,
92 ) -> None:
93 broker = self.get_broker()
95 publisher = broker.publisher(queue + "1", autoflush=True)
96 publisher.flush = AsyncMock()
98 @publisher
99 @broker.subscriber(queue)
100 async def m(msg):
101 return 1
103 async with self.patch_broker(broker) as br:
104 await br.publish("hello", queue)
106 m.mock.assert_called_once_with("hello")
107 publisher.mock.assert_called_once_with(1)
109 publisher.flush.assert_awaited_once()
111 async def test_batch_publisher_autoflush_mock(
112 self,
113 queue: str,
114 ) -> None:
115 broker = self.get_broker()
117 publisher = broker.publisher(queue + "1", batch=True, autoflush=True)
118 publisher.flush = AsyncMock()
120 @publisher
121 @broker.subscriber(queue)
122 async def m(msg):
123 return 1, 2, 3
125 async with self.patch_broker(broker) as br:
126 await br.publish("hello", queue)
128 m.mock.assert_called_once_with("hello")
129 publisher.mock.assert_called_once_with([1, 2, 3])
131 publisher.flush.assert_awaited_once()
133 @pytest.mark.connected()
134 async def test_with_real_testclient(
135 self,
136 queue: str,
137 ) -> None:
138 event = asyncio.Event()
140 broker = self.get_broker()
142 @broker.subscriber(queue)
143 def subscriber(m) -> None:
144 event.set()
146 async with self.patch_broker(broker, with_real=True) as br:
147 await asyncio.wait(
148 (
149 asyncio.create_task(br.publish("hello", queue)),
150 asyncio.create_task(event.wait()),
151 ),
152 timeout=3,
153 )
155 assert event.is_set()
157 async def test_batch_pub_by_default_pub(
158 self,
159 queue: str,
160 ) -> None:
161 broker = self.get_broker()
163 @broker.subscriber(queue, batch=True)
164 async def m(msg) -> None:
165 pass
167 async with self.patch_broker(broker) as br:
168 await br.publish("hello", queue)
169 m.mock.assert_called_once_with(["hello"])
171 async def test_batch_pub_by_pub_batch(
172 self,
173 queue: str,
174 ) -> None:
175 broker = self.get_broker()
177 @broker.subscriber(queue, batch=True)
178 async def m(msg) -> None:
179 pass
181 async with self.patch_broker(broker) as br:
182 await br.publish_batch("hello", topic=queue)
183 m.mock.assert_called_once_with(["hello"])
185 async def test_batch_publisher_mock(
186 self,
187 queue: str,
188 ) -> None:
189 broker = self.get_broker()
191 publisher = broker.publisher(queue + "1", batch=True)
193 @publisher
194 @broker.subscriber(queue)
195 async def m(msg):
196 return 1, 2, 3
198 async with self.patch_broker(broker) as br:
199 await br.publish("hello", queue)
200 m.mock.assert_called_once_with("hello")
201 publisher.mock.assert_called_once_with([1, 2, 3])
203 async def test_respect_middleware(self, queue: str) -> None:
204 routes = []
206 class Middleware(BaseMiddleware):
207 async def on_receive(self) -> None:
208 routes.append(None)
209 return await super().on_receive()
211 broker = self.get_broker(middlewares=(Middleware,))
213 @broker.subscriber(queue)
214 async def h1(msg) -> None: ...
216 @broker.subscriber(queue + "1")
217 async def h2(msg) -> None: ...
219 async with self.patch_broker(broker) as br:
220 await br.publish("", queue)
221 await br.publish("", queue + "1")
223 assert len(routes) == 2
225 @pytest.mark.connected()
226 async def test_real_respect_middleware(self, queue: str) -> None:
227 routes = []
229 class Middleware(BaseMiddleware):
230 async def on_receive(self) -> None:
231 routes.append(None)
232 return await super().on_receive()
234 broker = self.get_broker(middlewares=(Middleware,))
236 @broker.subscriber(queue)
237 async def h1(msg) -> None: ...
239 @broker.subscriber(queue + "1")
240 async def h2(msg) -> None: ...
242 async with self.patch_broker(broker, with_real=True) as br:
243 await br.publish("", queue)
244 await br.publish("", queue + "1")
245 await h1.wait_call(3)
246 await h2.wait_call(3)
248 assert len(routes) == 2
250 async def test_multiple_subscribers_different_groups(
251 self,
252 queue: str,
253 ) -> None:
254 test_broker = self.get_broker()
256 @test_broker.subscriber(queue, group_id="group1")
257 async def subscriber1(msg) -> None: ...
259 @test_broker.subscriber(queue, group_id="group2")
260 async def subscriber2(msg) -> None: ...
262 async with self.patch_broker(test_broker) as br:
263 await br.start()
264 await br.publish("", queue)
266 assert subscriber1.mock.call_count == 1
267 assert subscriber2.mock.call_count == 1
269 async def test_multiple_subscribers_same_group(self, queue: str) -> None:
270 broker = self.get_broker()
272 @broker.subscriber(queue, group_id="group1")
273 async def subscriber1(msg) -> None: ...
275 @broker.subscriber(queue, group_id="group1")
276 async def subscriber2(msg) -> None: ...
278 async with self.patch_broker(broker) as br:
279 await br.start()
280 await br.publish("", queue)
282 # we can't guarantee the order of calls
283 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0}
285 async def test_multiple_batch_subscriber_with_different_group(
286 self,
287 queue: str,
288 ) -> None:
289 broker = self.get_broker()
291 @broker.subscriber(queue, batch=True, group_id="group1")
292 async def subscriber1(msg) -> None: ...
294 @broker.subscriber(queue, batch=True, group_id="group2")
295 async def subscriber2(msg) -> None: ...
297 async with self.patch_broker(broker) as br:
298 await br.start()
299 await br.publish("", queue)
301 assert subscriber1.mock.call_count == 1
302 assert subscriber2.mock.call_count == 1
304 async def test_multiple_batch_subscriber_with_same_group(
305 self,
306 queue: str,
307 ) -> None:
308 broker = self.get_broker()
310 @broker.subscriber(queue, batch=True, group_id="group1")
311 async def subscriber1(msg) -> None: ...
313 @broker.subscriber(queue, batch=True, group_id="group1")
314 async def subscriber2(msg) -> None: ...
316 async with self.patch_broker(broker) as br:
317 await br.start()
318 await br.publish("", queue)
320 # we can't guarantee the order of calls
321 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0}
323 @pytest.mark.connected()
324 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
325 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
327 @pytest.mark.connected()
328 async def test_broker_with_real_doesnt_get_patched(self) -> None:
329 await super().test_broker_with_real_doesnt_get_patched()
331 @pytest.mark.connected()
332 async def test_broker_with_real_patches_publishers_and_subscribers(
333 self,
334 queue: str,
335 ) -> None:
336 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
338 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513")
339 async def test_publisher_without_destination(self) -> None:
340 """Fixes https://github.com/ag2ai/faststream/issues/2513."""
341 broker = self.get_broker()
343 # use two publishers to check that we don't have conflicts
344 publisher = broker.publisher(topic="")
345 another_publisher = broker.publisher(topic="")
347 async with self.patch_broker(broker):
348 await publisher.publish(None, topic="new-key")
349 publisher.mock.assert_called_once()
351 await another_publisher.publish(None, topic="new-key")
352 another_publisher.mock.assert_called_once()