Coverage for tests / brokers / base / consume.py: 99%
199 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import MagicMock, call
4import anyio
5import pytest
6from pydantic import BaseModel
8from faststream import Context, Depends
9from faststream.exceptions import StopConsume
11from .basic import BaseTestcaseConfig
14@pytest.mark.asyncio()
15class BrokerConsumeTestcase(BaseTestcaseConfig):
16 async def test_consume(
17 self,
18 queue: str,
19 ) -> None:
20 event = asyncio.Event()
21 consume_broker = self.get_broker()
23 args, kwargs = self.get_subscriber_params(queue)
25 @consume_broker.subscriber(*args, **kwargs)
26 def subscriber(m) -> None:
27 event.set()
29 async with self.patch_broker(consume_broker) as br:
30 await br.start()
31 await asyncio.wait(
32 (
33 asyncio.create_task(br.publish("hello", queue)),
34 asyncio.create_task(event.wait()),
35 ),
36 timeout=self.timeout,
37 )
39 assert event.is_set()
41 async def test_consume_from_multi(
42 self,
43 queue: str,
44 mock: MagicMock,
45 ) -> None:
46 consume_broker = self.get_broker()
48 consume = asyncio.Event()
49 consume2 = asyncio.Event()
51 args, kwargs = self.get_subscriber_params(queue)
52 args2, kwargs2 = self.get_subscriber_params(queue + "1")
54 @consume_broker.subscriber(*args, **kwargs)
55 @consume_broker.subscriber(*args2, **kwargs2)
56 def subscriber(m) -> None:
57 mock()
58 if not consume.is_set():
59 consume.set()
60 else:
61 consume2.set()
63 async with self.patch_broker(consume_broker) as br:
64 await br.start()
65 await asyncio.wait(
66 (
67 asyncio.create_task(br.publish("hello", queue)),
68 asyncio.create_task(br.publish("hello", queue + "1")),
69 asyncio.create_task(consume.wait()),
70 asyncio.create_task(consume2.wait()),
71 ),
72 timeout=self.timeout,
73 )
75 assert consume.is_set()
76 assert consume2.is_set()
77 assert mock.call_count == 2
79 async def test_consume_double(
80 self,
81 queue: str,
82 mock: MagicMock,
83 ) -> None:
84 consume_broker = self.get_broker()
86 consume = asyncio.Event()
87 consume2 = asyncio.Event()
89 args, kwargs = self.get_subscriber_params(queue)
91 @consume_broker.subscriber(*args, **kwargs)
92 async def handler(m) -> None:
93 mock()
94 if not consume.is_set():
95 consume.set()
96 else:
97 consume2.set()
99 async with self.patch_broker(consume_broker) as br:
100 await br.start()
101 await asyncio.wait(
102 (
103 asyncio.create_task(br.publish("hello", queue)),
104 asyncio.create_task(br.publish("hello", queue)),
105 asyncio.create_task(consume.wait()),
106 asyncio.create_task(consume2.wait()),
107 ),
108 timeout=self.timeout,
109 )
111 assert consume2.is_set()
112 assert consume.is_set()
113 assert mock.call_count == 2
115 async def test_different_consume(
116 self,
117 queue: str,
118 mock: MagicMock,
119 ) -> None:
120 consume_broker = self.get_broker()
122 consume = asyncio.Event()
123 consume2 = asyncio.Event()
125 args, kwargs = self.get_subscriber_params(queue)
127 @consume_broker.subscriber(*args, **kwargs)
128 def handler(m) -> None:
129 mock.handler()
130 consume.set()
132 another_topic = queue + "1"
133 args, kwargs = self.get_subscriber_params(another_topic)
135 @consume_broker.subscriber(*args, **kwargs)
136 def handler2(m) -> None:
137 mock.handler2()
138 consume2.set()
140 async with self.patch_broker(consume_broker) as br:
141 await br.start()
142 await asyncio.wait(
143 (
144 asyncio.create_task(br.publish("hello", queue)),
145 asyncio.create_task(br.publish("hello", another_topic)),
146 asyncio.create_task(consume.wait()),
147 asyncio.create_task(consume2.wait()),
148 ),
149 timeout=self.timeout,
150 )
152 assert consume.is_set()
153 assert consume2.is_set()
154 mock.handler.assert_called_once()
155 mock.handler2.assert_called_once()
157 async def test_consume_with_filter(
158 self,
159 queue: str,
160 mock: MagicMock,
161 ) -> None:
162 consume_broker = self.get_broker()
164 consume = asyncio.Event()
165 consume2 = asyncio.Event()
167 args, kwargs = self.get_subscriber_params(
168 queue,
169 )
171 sub = consume_broker.subscriber(*args, **kwargs)
173 @sub(filter=lambda m: m.content_type == "application/json")
174 async def handler(m) -> None:
175 mock.handler(m)
176 consume.set()
178 @sub
179 async def handler2(m) -> None:
180 mock.handler2(m)
181 consume2.set()
183 async with self.patch_broker(consume_broker) as br:
184 await br.start()
185 await asyncio.wait(
186 (
187 asyncio.create_task(br.publish({"msg": "hello"}, queue)),
188 asyncio.create_task(br.publish("hello", queue)),
189 asyncio.create_task(consume.wait()),
190 asyncio.create_task(consume2.wait()),
191 ),
192 timeout=self.timeout,
193 )
195 assert consume.is_set()
196 assert consume2.is_set()
197 mock.handler.assert_called_once_with({"msg": "hello"})
198 mock.handler2.assert_called_once_with("hello")
200 async def test_consume_validate_false(
201 self,
202 queue: str,
203 mock: MagicMock,
204 ) -> None:
205 event = asyncio.Event()
207 consume_broker = self.get_broker(
208 apply_types=True,
209 serializer=None,
210 )
212 class Foo(BaseModel):
213 x: int
215 def dependency() -> str:
216 return "100"
218 args, kwargs = self.get_subscriber_params(queue)
220 @consume_broker.subscriber(*args, **kwargs)
221 async def handler(
222 m: Foo,
223 dep: int = Depends(dependency),
224 broker=Context(),
225 ) -> None:
226 mock(m, dep, broker)
227 event.set()
229 async with self.patch_broker(consume_broker) as br:
230 await br.start()
232 await asyncio.wait(
233 (
234 asyncio.create_task(br.publish({"x": 1}, queue)),
235 asyncio.create_task(event.wait()),
236 ),
237 timeout=self.timeout,
238 )
240 assert event.is_set()
241 mock.assert_called_once_with({"x": 1}, "100", consume_broker)
243 async def test_dynamic_sub(self, queue: str) -> None:
244 event = asyncio.Event()
246 consume_broker = self.get_broker()
248 async def subscriber(m) -> None:
249 event.set()
251 async with self.patch_broker(consume_broker) as br:
252 await br.start()
254 args, kwargs = self.get_subscriber_params(queue)
255 sub = br.subscriber(*args, **kwargs)
256 sub(subscriber)
257 await sub.start()
259 await br.publish("hello", queue)
261 with anyio.move_on_after(self.timeout):
262 await event.wait()
264 await sub.stop()
266 assert event.is_set()
268 async def test_get_one_conflicts_with_handler(self, queue) -> None:
269 broker = self.get_broker(apply_types=True)
270 args, kwargs = self.get_subscriber_params(queue)
271 subscriber = broker.subscriber(*args, **kwargs)
273 @subscriber
274 async def t() -> None: ...
276 async with self.patch_broker(broker) as br:
277 await br.start()
279 with pytest.raises(AssertionError):
280 await subscriber.get_one(timeout=1e-24)
283@pytest.mark.asyncio()
284class BrokerRealConsumeTestcase(BrokerConsumeTestcase):
285 async def test_get_one(
286 self,
287 queue: str,
288 mock: MagicMock,
289 ) -> None:
290 broker = self.get_broker(apply_types=True)
292 args, kwargs = self.get_subscriber_params(queue)
293 subscriber = broker.subscriber(*args, **kwargs)
295 async with self.patch_broker(broker) as br:
296 await br.start()
298 async def consume() -> None:
299 mock(await subscriber.get_one(timeout=self.timeout))
301 async def publish() -> None:
302 await anyio.sleep(1e-24)
303 await br.publish("test_message", queue)
305 await asyncio.wait(
306 (
307 asyncio.create_task(consume()),
308 asyncio.create_task(publish()),
309 ),
310 timeout=self.timeout,
311 )
313 mock.assert_called_once()
314 message = mock.call_args[0][0]
315 assert message
316 assert await message.decode() == "test_message", await message.decode()
318 async def test_get_one_timeout(
319 self,
320 queue: str,
321 mock: MagicMock,
322 ) -> None:
323 broker = self.get_broker(apply_types=True)
324 args, kwargs = self.get_subscriber_params(queue)
325 subscriber = broker.subscriber(*args, **kwargs)
327 async with self.patch_broker(broker) as br:
328 await br.start()
330 mock(await subscriber.get_one(timeout=1e-24))
331 mock.assert_called_once_with(None)
333 @pytest.mark.slow()
334 async def test_stop_consume_exc(
335 self,
336 queue: str,
337 mock: MagicMock,
338 ) -> None:
339 event = asyncio.Event()
341 consume_broker = self.get_broker()
343 args, kwargs = self.get_subscriber_params(queue)
345 @consume_broker.subscriber(*args, **kwargs)
346 def subscriber(m):
347 mock()
348 event.set()
349 raise StopConsume
351 async with self.patch_broker(consume_broker) as br:
352 await br.start()
353 await asyncio.wait(
354 (
355 asyncio.create_task(br.publish("hello", queue)),
356 asyncio.create_task(event.wait()),
357 ),
358 timeout=self.timeout,
359 )
360 await asyncio.sleep(0.5)
361 await br.publish("hello", queue)
362 await asyncio.sleep(0.5)
364 assert event.is_set()
365 mock.assert_called_once()
367 @pytest.mark.asyncio()
368 async def test_iteration(
369 self,
370 queue: str,
371 mock: MagicMock,
372 ) -> None:
373 expected_messages = ("test_message_1", "test_message_2")
375 broker = self.get_broker(apply_types=True)
377 args, kwargs = self.get_subscriber_params(queue)
378 subscriber = broker.subscriber(*args, **kwargs)
380 async with self.patch_broker(broker) as br:
381 await br.start()
383 async def publish_test_message():
384 for msg in expected_messages:
385 await br.publish(msg, queue)
387 async def consume():
388 index_message = 0
389 async for msg in subscriber: 389 ↛ exitline 389 didn't return from function 'consume' because the loop on line 389 didn't complete
390 result_message = await msg.decode()
392 mock(result_message)
394 index_message += 1
395 if index_message >= len(expected_messages):
396 break
398 await asyncio.wait(
399 (
400 asyncio.create_task(consume()),
401 asyncio.create_task(publish_test_message()),
402 ),
403 timeout=self.timeout,
404 )
406 calls = [call(msg) for msg in expected_messages]
407 mock.assert_has_calls(calls=calls)