Coverage for tests / brokers / confluent / test_test_client.py: 99%
146 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import AsyncMock, patch
4import pytest
6from faststream import AckPolicy, BaseMiddleware
7from faststream.confluent.annotations import KafkaMessage
8from faststream.confluent.message import FAKE_CONSUMER
9from faststream.confluent.testing import FakeProducer
10from tests.brokers.base.testclient import BrokerTestclientTestcase
11from tests.tools import spy_decorator
13from .basic import ConfluentMemoryTestcaseConfig
16@pytest.mark.confluent()
17@pytest.mark.asyncio()
18class TestTestclient(ConfluentMemoryTestcaseConfig, BrokerTestclientTestcase):
19 async def test_message_nack_seek(self, queue: str) -> None:
20 broker = self.get_broker(apply_types=True)
22 @broker.subscriber(
23 queue,
24 group_id=f"{queue}-consume",
25 auto_offset_reset="earliest",
26 ack_policy=AckPolicy.REJECT_ON_ERROR,
27 )
28 async def m(msg: KafkaMessage) -> None:
29 await msg.nack()
31 async with self.patch_broker(broker) as br:
32 with patch.object(
33 FAKE_CONSUMER,
34 "seek",
35 spy_decorator(FAKE_CONSUMER.seek),
36 ) as mocked:
37 await br.publish("hello", queue)
38 m.mock.assert_called_once_with("hello")
39 mocked.mock.assert_called_once()
41 @pytest.mark.connected()
42 async def test_with_real_testclient(self, queue: str) -> None:
43 event = asyncio.Event()
45 broker = self.get_broker()
47 args, kwargs = self.get_subscriber_params(queue)
49 @broker.subscriber(*args, **kwargs)
50 def subscriber(m) -> None:
51 event.set()
53 async with self.patch_broker(broker, with_real=True) as br:
54 await asyncio.wait(
55 (
56 asyncio.create_task(br.publish("hello", queue)),
57 asyncio.create_task(event.wait()),
58 ),
59 timeout=10,
60 )
62 assert event.is_set()
64 async def test_publisher_autoflush_mock(self, queue: str) -> None:
65 broker = self.get_broker()
67 publisher = broker.publisher(queue + "1", autoflush=True)
68 publisher.flush = AsyncMock()
70 @publisher
71 @broker.subscriber(queue)
72 async def m(msg):
73 pass
75 async with self.patch_broker(broker) as br:
76 await br.publish("hello", queue)
78 m.mock.assert_called_once_with("hello")
79 publisher.mock.assert_called_once()
81 publisher.flush.assert_awaited_once()
83 async def test_batch_publisher_autoflush_mock(self, queue: str) -> None:
84 broker = self.get_broker()
86 publisher = broker.publisher(queue + "1", batch=True, autoflush=True)
87 publisher.flush = AsyncMock()
89 @publisher
90 @broker.subscriber(queue)
91 async def m(msg):
92 return 1, 2, 3
94 async with self.patch_broker(broker) as br:
95 await br.publish("hello", queue)
97 m.mock.assert_called_once_with("hello")
98 publisher.mock.assert_called_once_with([1, 2, 3])
100 publisher.flush.assert_awaited_once()
102 async def test_batch_pub_by_default_pub(
103 self,
104 queue: str,
105 ) -> None:
106 broker = self.get_broker()
108 @broker.subscriber(queue, batch=True)
109 async def m(msg) -> None:
110 pass
112 async with self.patch_broker(broker) as br:
113 await br.publish("hello", queue)
114 m.mock.assert_called_once_with(["hello"])
116 async def test_batch_pub_by_pub_batch(
117 self,
118 queue: str,
119 ) -> None:
120 broker = self.get_broker()
122 @broker.subscriber(queue, batch=True)
123 async def m(msg) -> None:
124 pass
126 async with self.patch_broker(broker) as br:
127 await br.publish_batch("hello", topic=queue)
128 m.mock.assert_called_once_with(["hello"])
130 async def test_batch_publisher_mock(self, queue: str) -> None:
131 broker = self.get_broker()
133 publisher = broker.publisher(queue + "1", batch=True)
135 @publisher
136 @broker.subscriber(queue)
137 async def m(msg):
138 return 1, 2, 3
140 async with self.patch_broker(broker) as br:
141 await br.publish("hello", queue)
142 m.mock.assert_called_once_with("hello")
143 publisher.mock.assert_called_once_with([1, 2, 3])
145 async def test_respect_middleware(self, queue: str) -> None:
146 routes = []
148 class Middleware(BaseMiddleware):
149 async def on_receive(self) -> None:
150 routes.append(None)
151 return await super().on_receive()
153 broker = self.get_broker(middlewares=(Middleware,))
155 @broker.subscriber(queue)
156 async def h1(msg) -> None: ...
158 @broker.subscriber(queue + "1")
159 async def h2(msg) -> None: ...
161 async with self.patch_broker(broker) as br:
162 await br.publish("", queue)
163 await br.publish("", queue + "1")
165 assert len(routes) == 2
167 @pytest.mark.connected()
168 async def test_real_respect_middleware(self, queue: str) -> None:
169 routes = []
171 class Middleware(BaseMiddleware):
172 async def on_receive(self) -> None:
173 routes.append(None)
174 return await super().on_receive()
176 broker = self.get_broker(middlewares=(Middleware,))
178 args, kwargs = self.get_subscriber_params(queue)
180 @broker.subscriber(*args, **kwargs)
181 async def h1(msg) -> None: ...
183 args2, kwargs2 = self.get_subscriber_params(queue + "1")
185 @broker.subscriber(*args2, **kwargs2)
186 async def h2(msg) -> None: ...
188 async with self.patch_broker(broker, with_real=True) as br:
189 await br.publish("", queue)
190 await br.publish("", queue + "1")
191 await h1.wait_call(10)
192 await h2.wait_call(10)
194 assert len(routes) == 2
196 async def test_multiple_subscribers_different_groups(self, queue: str) -> None:
197 broker = self.get_broker()
199 @broker.subscriber(queue, group_id="group1")
200 async def subscriber1(msg) -> None: ...
202 @broker.subscriber(queue, group_id="group2")
203 async def subscriber2(msg) -> None: ...
205 async with self.patch_broker(broker) as br:
206 await br.start()
207 await br.publish("", queue)
209 assert subscriber1.mock.call_count == 1
210 assert subscriber2.mock.call_count == 1
212 async def test_multiple_subscribers_same_group(self, queue: str) -> None:
213 broker = self.get_broker()
215 @broker.subscriber(queue, group_id="group1")
216 async def subscriber1(msg) -> None: ...
218 @broker.subscriber(queue, group_id="group1")
219 async def subscriber2(msg) -> None: ...
221 async with self.patch_broker(broker) as br:
222 await br.start()
223 await br.publish("", queue)
225 # we can't guarantee the order of calls
226 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0}
228 async def test_multiple_batch_subscriber_with_different_group(
229 self, queue: str
230 ) -> None:
231 broker = self.get_broker()
233 @broker.subscriber(queue, batch=True, group_id="group1")
234 async def subscriber1(msg) -> None: ...
236 @broker.subscriber(queue, batch=True, group_id="group2")
237 async def subscriber2(msg) -> None: ...
239 async with self.patch_broker(broker) as br:
240 await br.start()
241 await br.publish("", queue)
243 assert subscriber1.mock.call_count == 1
244 assert subscriber2.mock.call_count == 1
246 async def test_multiple_batch_subscriber_with_same_group(self, queue: str) -> None:
247 broker = self.get_broker()
249 @broker.subscriber(queue, batch=True, group_id="group1")
250 async def subscriber1(msg) -> None: ...
252 @broker.subscriber(queue, batch=True, group_id="group1")
253 async def subscriber2(msg) -> None: ...
255 async with self.patch_broker(broker) as br:
256 await br.start()
257 await br.publish("", queue)
259 # we can't guarantee the order of calls
260 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0}
262 @pytest.mark.connected()
263 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
264 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
266 @pytest.mark.connected()
267 async def test_broker_with_real_doesnt_get_patched(self) -> None:
268 await super().test_broker_with_real_doesnt_get_patched()
270 @pytest.mark.connected()
271 async def test_broker_with_real_patches_publishers_and_subscribers(
272 self, queue: str
273 ) -> None:
274 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
276 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513")
277 async def test_publisher_without_destination(self) -> None:
278 """Fixes https://github.com/ag2ai/faststream/issues/2513."""
279 broker = self.get_broker()
281 # use two publishers to check that we don't have conflicts
282 publisher = broker.publisher(topic="")
283 another_publisher = broker.publisher(topic="")
285 async with self.patch_broker(broker):
286 await publisher.publish(None, topic="new-key")
287 publisher.mock.assert_called_once()
289 await another_publisher.publish(None, topic="new-key")
290 another_publisher.mock.assert_called_once()