Coverage for tests / brokers / rabbit / test_test_client.py: 97%
119 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import datetime as dt
3from typing import Any
4from unittest.mock import MagicMock
6import pytest
7from freezegun import freeze_time
9from faststream import BaseMiddleware
10from faststream.exceptions import SubscriberNotFound
11from faststream.rabbit import (
12 ExchangeType,
13 RabbitBroker,
14 RabbitExchange,
15 RabbitQueue,
16)
17from faststream.rabbit.annotations import RabbitMessage
18from faststream.rabbit.testing import FakeProducer, _is_handler_matches, apply_pattern
19from tests.brokers.base.testclient import BrokerTestclientTestcase
21from .basic import RabbitMemoryTestcaseConfig
22from .test_publish import TestPublishWithExchange as PublishWithExchangeCase
24_frozen_time = dt.datetime(2026, 2, 10, 12, 0, 0, tzinfo=dt.timezone.utc)
27@pytest.mark.rabbit()
28@pytest.mark.asyncio()
29class TestTestclient(
30 PublishWithExchangeCase, RabbitMemoryTestcaseConfig, BrokerTestclientTestcase
31):
32 @pytest.mark.connected()
33 async def test_with_real_testclient(
34 self,
35 queue: str,
36 ) -> None:
37 event = asyncio.Event()
39 broker = self.get_broker()
41 @broker.subscriber(queue)
42 def subscriber(m) -> None:
43 event.set()
45 async with self.patch_broker(broker, with_real=True) as br:
46 await asyncio.wait(
47 (
48 asyncio.create_task(br.publish("hello", queue)),
49 asyncio.create_task(event.wait()),
50 ),
51 timeout=3,
52 )
54 assert event.is_set()
56 async def test_direct_not_found(
57 self,
58 queue: str,
59 ) -> None:
60 broker = self.get_broker()
62 async with self.patch_broker(broker) as br:
63 with pytest.raises(SubscriberNotFound):
64 await br.request("", "")
66 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513")
67 async def test_publisher_without_destination(self) -> None:
68 """Fixes https://github.com/ag2ai/faststream/issues/2513."""
69 broker = self.get_broker()
71 # use two publishers to check that we don't have conflicts
72 publisher = broker.publisher(exchange="test_exchange")
73 another_publisher = broker.publisher(exchange="test_exchange")
75 async with self.patch_broker(broker):
76 await publisher.publish(None, routing_key="new-key")
77 publisher.mock.assert_called_once()
79 await another_publisher.publish(None, routing_key="new-key")
80 another_publisher.mock.assert_called_once()
82 async def test_consume_manual_ack(
83 self,
84 queue: str,
85 exchange: RabbitExchange,
86 ) -> None:
87 broker = self.get_broker(apply_types=True)
89 consume = asyncio.Event()
90 consume2 = asyncio.Event()
91 consume3 = asyncio.Event()
93 @broker.subscriber(queue=queue, exchange=exchange)
94 async def handler(msg: RabbitMessage) -> None:
95 await msg.raw_message.ack()
96 consume.set()
98 @broker.subscriber(queue=queue + "1", exchange=exchange)
99 async def handler2(msg: RabbitMessage) -> None:
100 await msg.raw_message.nack()
101 consume2.set()
102 raise ValueError
104 @broker.subscriber(queue=queue + "2", exchange=exchange)
105 async def handler3(msg: RabbitMessage) -> None:
106 await msg.raw_message.reject()
107 consume3.set()
108 raise ValueError
110 async with self.patch_broker(broker) as br:
111 await asyncio.wait(
112 (
113 asyncio.create_task(
114 br.publish("hello", queue=queue, exchange=exchange),
115 ),
116 asyncio.create_task(
117 br.publish("hello", queue=queue + "1", exchange=exchange),
118 ),
119 asyncio.create_task(
120 br.publish("hello", queue=queue + "2", exchange=exchange),
121 ),
122 asyncio.create_task(consume.wait()),
123 asyncio.create_task(consume2.wait()),
124 asyncio.create_task(consume3.wait()),
125 ),
126 timeout=3,
127 )
129 assert consume.is_set()
130 assert consume2.is_set()
131 assert consume3.is_set()
133 async def test_respect_middleware(self, queue: str) -> None:
134 routes = []
136 class Middleware(BaseMiddleware):
137 async def on_receive(self) -> None:
138 routes.append(None)
139 return await super().on_receive()
141 broker = self.get_broker(middlewares=(Middleware,))
143 @broker.subscriber(queue)
144 async def h1(msg) -> None: ...
146 @broker.subscriber(queue + "1")
147 async def h2(msg) -> None: ...
149 async with self.patch_broker(broker) as br:
150 await br.publish("", queue)
151 await br.publish("", queue + "1")
153 assert len(routes) == 2
155 @pytest.mark.connected()
156 async def test_real_respect_middleware(self, queue: str) -> None:
157 routes = []
159 class Middleware(BaseMiddleware):
160 async def on_receive(self) -> None:
161 routes.append(None)
162 return await super().on_receive()
164 broker = self.get_broker(middlewares=(Middleware,))
166 @broker.subscriber(queue)
167 async def h1(msg) -> None: ...
169 @broker.subscriber(queue + "1")
170 async def h2(msg) -> None: ...
172 async with self.patch_broker(broker, with_real=True) as br:
173 await br.publish("", queue)
174 await br.publish("", queue + "1")
175 await h1.wait_call(3)
176 await h2.wait_call(3)
178 assert len(routes) == 2
180 @pytest.mark.connected()
181 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
182 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
184 @pytest.mark.connected()
185 async def test_broker_with_real_doesnt_get_patched(self) -> None:
186 await super().test_broker_with_real_doesnt_get_patched()
188 @pytest.mark.connected()
189 async def test_broker_with_real_patches_publishers_and_subscribers(
190 self,
191 queue: str,
192 ) -> None:
193 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
195 @pytest.mark.asyncio()
196 @pytest.mark.parametrize(
197 ("expiration", "expected"),
198 (
199 pytest.param(None, None, id="none"),
200 pytest.param(1, 1, id="int"),
201 pytest.param(1.5, 1.5, id="float"),
202 pytest.param(dt.timedelta(seconds=1.1), 1.1, id="timedelta"),
203 pytest.param(_frozen_time, 0, id="datetime"),
204 ),
205 )
206 @freeze_time(_frozen_time)
207 async def test_publish_expiration_propagated(
208 self, expiration: Any, expected: Any, queue: str, mock: MagicMock
209 ) -> None:
210 broker = self.get_broker(apply_types=True)
212 args, kwargs = self.get_subscriber_params(queue)
214 @broker.subscriber(*args, **kwargs)
215 async def m(msg: RabbitMessage) -> None:
216 mock(msg)
218 async with self.patch_broker(broker) as br:
219 await br.start()
220 await br.publish("hello", queue, expiration=expiration)
221 msg = mock.call_args[0][0]
222 assert msg.raw_message.expiration == expected
225@pytest.mark.parametrize(
226 ("pattern", "current", "result"),
227 (
228 pytest.param("#", "1.2.3", True, id="#"),
229 pytest.param("*", "1", True, id="*"),
230 pytest.param("*", "1.2", False, id="* - broken"),
231 pytest.param("test.*", "test.1", True, id="test.*"),
232 pytest.param("test.#", "test.1", True, id="test.#"),
233 pytest.param("#.test.#", "1.2.test.1.2", True, id="#.test.#"),
234 pytest.param("#.test.*", "1.2.test.1", True, id="#.test.*"),
235 pytest.param("#.test.*.*", "1.2.test.1.2", True, id="#.test.*."),
236 pytest.param("#.test.*.*.*", "1.2.test.1.2", False, id="#.test.*.*.* - broken"),
237 pytest.param(
238 "#.test.*.test.#",
239 "1.2.test.1.test.1.2",
240 True,
241 id="#.test.*.test.#",
242 ),
243 pytest.param("#.*.test", "1.2.2.test", True, id="#.*.test"),
244 pytest.param("#.2.*.test", "1.2.2.test", True, id="#.2.*.test"),
245 pytest.param("#.*.*.test", "1.2.2.test", True, id="#.*.*.test"),
246 pytest.param("*.*.*.test", "1.2.test", False, id="*.*.*.test - broken"),
247 pytest.param("#.*.*.test", "1.2.test", False, id="#.*.*.test - broken"),
248 ),
249)
250def test(pattern: str, current: str, result: bool) -> None:
251 assert apply_pattern(pattern, current) == result
254exch_direct = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.DIRECT)
255exch_fanout = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)
256exch_topic = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.TOPIC)
257exch_headers = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS)
258reqular_queue = RabbitQueue("test-reqular-queue", auto_delete=True)
260routing_key_queue = RabbitQueue(
261 "test-routing-key-queue",
262 auto_delete=True,
263 routing_key="*.info",
264)
265one_key_queue = RabbitQueue(
266 "test-one-key-queue",
267 auto_delete=True,
268 bind_arguments={"key": 1},
269)
270any_keys_queue = RabbitQueue(
271 "test-any-keys-queue",
272 auto_delete=True,
273 bind_arguments={"key": 2, "key2": 2, "x-match": "any"},
274)
275all_keys_queue = RabbitQueue(
276 "test-all-keys-queue",
277 auto_delete=True,
278 bind_arguments={"key": 2, "key2": 2, "x-match": "all"},
279)
281broker = RabbitBroker()
284@pytest.mark.rabbit()
285@pytest.mark.parametrize(
286 (
287 "queue",
288 "exchange",
289 "routing_key",
290 "headers",
291 "expected_result",
292 ),
293 (
294 pytest.param(
295 reqular_queue,
296 exch_direct,
297 reqular_queue.routing(),
298 {},
299 True,
300 id="direct match",
301 ),
302 pytest.param(
303 reqular_queue,
304 exch_direct,
305 "wrong key",
306 {},
307 False,
308 id="direct mismatch",
309 ),
310 pytest.param(
311 reqular_queue,
312 exch_fanout,
313 "",
314 {},
315 True,
316 id="fanout match",
317 ),
318 pytest.param(
319 routing_key_queue,
320 exch_topic,
321 "log.info",
322 {},
323 True,
324 id="topic match",
325 ),
326 pytest.param(
327 routing_key_queue,
328 exch_topic,
329 "log.wrong",
330 {},
331 False,
332 id="topic mismatch",
333 ),
334 pytest.param(
335 one_key_queue,
336 exch_headers,
337 "",
338 {"key": 1},
339 True,
340 id="one header match",
341 ),
342 pytest.param(
343 one_key_queue,
344 exch_headers,
345 "",
346 {"key": "wrong"},
347 False,
348 id="one header mismatch",
349 ),
350 pytest.param(
351 any_keys_queue,
352 exch_headers,
353 "",
354 {"key2": 2},
355 True,
356 id="any headers match",
357 ),
358 pytest.param(
359 any_keys_queue,
360 exch_headers,
361 "",
362 {"key2": "wrong"},
363 False,
364 id="any headers mismatch",
365 ),
366 pytest.param(
367 all_keys_queue,
368 exch_headers,
369 "",
370 {"key": 2, "key2": 2},
371 True,
372 id="all headers match",
373 ),
374 pytest.param(
375 all_keys_queue,
376 exch_headers,
377 "",
378 {"key": "wrong", "key2": 2},
379 False,
380 id="all headers mismatch",
381 ),
382 ),
383)
384def test_in_memory_routing(
385 queue: str,
386 exchange: RabbitExchange,
387 routing_key: str,
388 headers: dict[str, Any],
389 expected_result: bool,
390) -> None:
391 subscriber = broker.subscriber(queue, exchange)
392 assert (
393 _is_handler_matches(subscriber, routing_key, headers, exchange) is expected_result
394 )