Coverage for tests / brokers / base / publish.py: 99%
228 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from contextlib import suppress
3from dataclasses import asdict, dataclass
4from datetime import datetime, timezone
5from typing import Any
6from unittest.mock import MagicMock
8import anyio
9import pytest
10from pydantic import BaseModel
12from faststream import BaseMiddleware, Context, Response
13from faststream._internal._compat import dump_json, model_to_json
14from faststream.exceptions import SubscriberNotFound
16from .basic import BaseTestcaseConfig
19class SimpleModel(BaseModel):
20 r: str
23@dataclass
24class SimpleDataclass:
25 r: str
28now = datetime.now(timezone.utc)
30parametrized = (
31 pytest.param(
32 "hello",
33 str,
34 "hello",
35 id="str->str",
36 ),
37 pytest.param(
38 b"hello",
39 bytes,
40 b"hello",
41 id="bytes->bytes",
42 ),
43 pytest.param(
44 1,
45 int,
46 1,
47 id="int->int",
48 ),
49 pytest.param(
50 1.0,
51 float,
52 1.0,
53 id="float->float",
54 ),
55 pytest.param(
56 1,
57 float,
58 1.0,
59 id="int->float",
60 ),
61 pytest.param(
62 False,
63 bool,
64 False,
65 id="bool->bool",
66 ),
67 pytest.param(
68 {"m": 1},
69 dict[str, int],
70 {"m": 1},
71 id="dict->dict",
72 ),
73 pytest.param(
74 [1, 2, 3],
75 list[int],
76 [1, 2, 3],
77 id="list->list",
78 ),
79 pytest.param(
80 now,
81 datetime,
82 now,
83 id="datetime->datetime",
84 ),
85 pytest.param(
86 dump_json(asdict(SimpleDataclass(r="hello!"))),
87 SimpleDataclass,
88 SimpleDataclass(r="hello!"),
89 id="bytes->dataclass",
90 ),
91 pytest.param(
92 SimpleDataclass(r="hello!"),
93 SimpleDataclass,
94 SimpleDataclass(r="hello!"),
95 id="dataclass->dataclass",
96 ),
97 pytest.param(
98 SimpleDataclass(r="hello!"),
99 dict,
100 {"r": "hello!"},
101 id="dataclass->dict",
102 ),
103 pytest.param(
104 {"r": "hello!"},
105 SimpleDataclass,
106 SimpleDataclass(r="hello!"),
107 id="dict->dataclass",
108 ),
109)
112class BrokerPublishTestcase(BaseTestcaseConfig):
113 @pytest.mark.asyncio()
114 @pytest.mark.parametrize(
115 ("message", "message_type", "expected_message"),
116 (
117 *parametrized,
118 pytest.param(
119 model_to_json(SimpleModel(r="hello!")).encode(),
120 SimpleModel,
121 SimpleModel(r="hello!"),
122 id="bytes->model",
123 ),
124 pytest.param(
125 SimpleModel(r="hello!"),
126 SimpleModel,
127 SimpleModel(r="hello!"),
128 id="model->model",
129 ),
130 pytest.param(
131 SimpleModel(r="hello!"),
132 dict,
133 {"r": "hello!"},
134 id="model->dict",
135 ),
136 pytest.param(
137 {"r": "hello!"},
138 SimpleModel,
139 SimpleModel(r="hello!"),
140 id="dict->model",
141 ),
142 ),
143 )
144 async def test_serialize(
145 self,
146 queue: str,
147 message: Any,
148 message_type: Any,
149 expected_message: Any,
150 mock: MagicMock,
151 ) -> None:
152 event = asyncio.Event()
154 pub_broker = self.get_broker(apply_types=True)
156 args, kwargs = self.get_subscriber_params(queue)
158 @pub_broker.subscriber(*args, **kwargs)
159 async def handler(m: message_type) -> None:
160 event.set()
161 mock(m)
163 async with self.patch_broker(pub_broker) as br:
164 await br.start()
166 await asyncio.wait(
167 (
168 asyncio.create_task(br.publish(message, queue)),
169 asyncio.create_task(event.wait()),
170 ),
171 timeout=self.timeout,
172 )
174 mock.assert_called_with(expected_message)
176 @pytest.mark.asyncio()
177 async def test_response(
178 self,
179 queue: str,
180 mock: MagicMock,
181 ) -> None:
182 event = asyncio.Event()
184 pub_broker = self.get_broker(apply_types=True)
186 args, kwargs = self.get_subscriber_params(queue)
188 @pub_broker.subscriber(*args, **kwargs)
189 @pub_broker.publisher(queue + "1")
190 async def m():
191 return Response(1, headers={"custom": "1"}, correlation_id="1")
193 args2, kwargs2 = self.get_subscriber_params(queue + "1")
195 @pub_broker.subscriber(*args2, **kwargs2)
196 async def m_next(msg=Context("message")) -> None:
197 event.set()
198 mock(
199 body=msg.body,
200 headers=msg.headers["custom"],
201 correlation_id=msg.correlation_id,
202 )
204 async with self.patch_broker(pub_broker) as br:
205 await br.start()
206 await asyncio.wait(
207 (
208 asyncio.create_task(br.publish(None, queue)),
209 asyncio.create_task(event.wait()),
210 ),
211 timeout=self.timeout,
212 )
214 mock.assert_called_with(
215 body=b"1",
216 correlation_id="1",
217 headers="1",
218 )
220 @pytest.mark.asyncio()
221 async def test_unwrap_dict(
222 self,
223 queue: str,
224 mock: MagicMock,
225 ) -> None:
226 event = asyncio.Event()
228 pub_broker = self.get_broker(apply_types=True)
230 args, kwargs = self.get_subscriber_params(queue)
232 @pub_broker.subscriber(*args, **kwargs)
233 async def m(a: int, b: int) -> None:
234 event.set()
235 mock({"a": a, "b": b})
237 async with self.patch_broker(pub_broker) as br:
238 await br.start()
239 await asyncio.wait(
240 (
241 asyncio.create_task(br.publish({"a": 1, "b": 1.0}, queue)),
242 asyncio.create_task(event.wait()),
243 ),
244 timeout=self.timeout,
245 )
247 mock.assert_called_with(
248 {
249 "a": 1,
250 "b": 1,
251 },
252 )
254 @pytest.mark.asyncio()
255 async def test_unwrap_list(
256 self,
257 mock: MagicMock,
258 queue: str,
259 ) -> None:
260 event = asyncio.Event()
262 pub_broker = self.get_broker(apply_types=True)
264 args, kwargs = self.get_subscriber_params(queue)
266 @pub_broker.subscriber(*args, **kwargs)
267 async def m(a: int, b: int, *args: tuple[int, ...]) -> None:
268 event.set()
269 mock({"a": a, "b": b, "args": args})
271 async with self.patch_broker(pub_broker) as br:
272 await br.start()
273 await asyncio.wait(
274 (
275 asyncio.create_task(br.publish([1, 2.0, 3.0, 4.0], queue)),
276 asyncio.create_task(event.wait()),
277 ),
278 timeout=self.timeout,
279 )
281 mock.assert_called_with({"a": 1, "b": 2, "args": (3, 4)})
283 @pytest.mark.asyncio()
284 async def test_base_publisher(
285 self,
286 queue: str,
287 mock: MagicMock,
288 ) -> None:
289 event = asyncio.Event()
291 pub_broker = self.get_broker(apply_types=True)
293 args, kwargs = self.get_subscriber_params(queue)
295 @pub_broker.subscriber(*args, **kwargs)
296 @pub_broker.publisher(queue + "resp")
297 async def m() -> str:
298 return ""
300 args2, kwargs2 = self.get_subscriber_params(queue + "resp")
302 @pub_broker.subscriber(*args2, **kwargs2)
303 async def resp(msg) -> None:
304 event.set()
305 mock(msg)
307 async with self.patch_broker(pub_broker) as br:
308 await br.start()
309 await asyncio.wait(
310 (
311 asyncio.create_task(br.publish("", queue)),
312 asyncio.create_task(event.wait()),
313 ),
314 timeout=self.timeout,
315 )
317 assert event.is_set()
318 mock.assert_called_once_with("")
320 @pytest.mark.asyncio()
321 async def test_publisher_object(
322 self,
323 queue: str,
324 mock: MagicMock,
325 ) -> None:
326 event = asyncio.Event()
328 pub_broker = self.get_broker(apply_types=True)
330 publisher = pub_broker.publisher(queue + "resp")
332 args, kwargs = self.get_subscriber_params(queue)
334 @publisher
335 @pub_broker.subscriber(*args, **kwargs)
336 async def m() -> str:
337 return ""
339 args, kwargs = self.get_subscriber_params(queue + "resp")
341 @pub_broker.subscriber(*args, **kwargs)
342 async def resp(msg) -> None:
343 event.set()
344 mock(msg)
346 async with self.patch_broker(pub_broker) as br:
347 await br.start()
348 await asyncio.wait(
349 (
350 asyncio.create_task(br.publish("", queue)),
351 asyncio.create_task(event.wait()),
352 ),
353 timeout=self.timeout,
354 )
356 mock.assert_called_once_with("")
358 @pytest.mark.asyncio()
359 async def test_publish_manual(
360 self,
361 queue: str,
362 mock: MagicMock,
363 ) -> None:
364 event = asyncio.Event()
366 pub_broker = self.get_broker(apply_types=True)
368 publisher = pub_broker.publisher(queue + "resp")
370 args, kwargs = self.get_subscriber_params(queue)
372 @pub_broker.subscriber(*args, **kwargs)
373 async def m() -> None:
374 await publisher.publish("")
376 args2, kwargs2 = self.get_subscriber_params(queue + "resp")
378 @pub_broker.subscriber(*args2, **kwargs2)
379 async def resp(msg) -> None:
380 event.set()
381 mock(msg)
383 async with self.patch_broker(pub_broker) as br:
384 await br.start()
385 await asyncio.wait(
386 (
387 asyncio.create_task(br.publish("", queue)),
388 asyncio.create_task(event.wait()),
389 ),
390 timeout=self.timeout,
391 )
393 mock.assert_called_once_with("")
395 @pytest.mark.asyncio()
396 async def test_multiple_publishers(
397 self,
398 queue: str,
399 mock: MagicMock,
400 ) -> None:
401 pub_broker = self.get_broker(apply_types=True)
403 event = anyio.Event()
404 event2 = anyio.Event()
406 args, kwargs = self.get_subscriber_params(queue)
408 @pub_broker.publisher(queue + "resp2")
409 @pub_broker.subscriber(*args, **kwargs)
410 @pub_broker.publisher(queue + "resp")
411 async def m() -> str:
412 return ""
414 args2, kwargs2 = self.get_subscriber_params(queue + "resp")
416 @pub_broker.subscriber(*args2, **kwargs2)
417 async def resp(msg) -> None:
418 event.set()
419 mock.resp1(msg)
421 args3, kwargs3 = self.get_subscriber_params(queue + "resp2")
423 @pub_broker.subscriber(*args3, **kwargs3)
424 async def resp2(msg) -> None:
425 event2.set()
426 mock.resp2(msg)
428 async with self.patch_broker(pub_broker) as br:
429 await br.start()
430 await asyncio.wait(
431 (
432 asyncio.create_task(br.publish("", queue)),
433 asyncio.create_task(event.wait()),
434 asyncio.create_task(event2.wait()),
435 ),
436 timeout=self.timeout,
437 )
439 mock.resp1.assert_called_once_with("")
440 mock.resp2.assert_called_once_with("")
442 @pytest.mark.asyncio()
443 async def test_reusable_publishers(
444 self,
445 queue: str,
446 mock: MagicMock,
447 ) -> None:
448 pub_broker = self.get_broker(apply_types=True)
450 consume = anyio.Event()
451 consume2 = anyio.Event()
453 pub = pub_broker.publisher(queue + "resp")
455 args, kwargs = self.get_subscriber_params(queue)
457 @pub
458 @pub_broker.subscriber(*args, **kwargs)
459 async def m() -> str:
460 return ""
462 args2, kwargs2 = self.get_subscriber_params(queue + "2")
464 @pub
465 @pub_broker.subscriber(*args2, **kwargs2)
466 async def m2() -> str:
467 return ""
469 args3, kwargs3 = self.get_subscriber_params(queue + "resp")
471 @pub_broker.subscriber(*args3, **kwargs3)
472 async def resp() -> None:
473 if not consume.is_set():
474 consume.set()
475 else:
476 consume2.set()
477 mock()
479 async with self.patch_broker(pub_broker) as br:
480 await br.start()
481 await asyncio.wait(
482 (
483 asyncio.create_task(br.publish("", queue)),
484 asyncio.create_task(br.publish("", queue + "2")),
485 asyncio.create_task(consume.wait()),
486 asyncio.create_task(consume2.wait()),
487 ),
488 timeout=self.timeout,
489 )
491 assert mock.call_count == 2
493 @pytest.mark.asyncio()
494 async def test_reply_to(
495 self,
496 queue: str,
497 mock: MagicMock,
498 ) -> None:
499 event = asyncio.Event()
501 pub_broker = self.get_broker(apply_types=True)
503 args, kwargs = self.get_subscriber_params(queue + "reply")
505 @pub_broker.subscriber(*args, **kwargs)
506 async def reply_handler(m) -> None:
507 event.set()
508 mock(m)
510 args2, kwargs2 = self.get_subscriber_params(queue)
512 @pub_broker.subscriber(*args2, **kwargs2)
513 async def handler(m):
514 return m
516 async with self.patch_broker(pub_broker) as br:
517 await br.start()
519 await asyncio.wait(
520 (
521 asyncio.create_task(
522 br.publish("Hello!", queue, reply_to=queue + "reply"),
523 ),
524 asyncio.create_task(event.wait()),
525 ),
526 timeout=self.timeout,
527 )
529 mock.assert_called_with("Hello!")
531 @pytest.mark.asyncio()
532 async def test_no_reply(
533 self,
534 queue: str,
535 mock: MagicMock,
536 ) -> None:
537 event = asyncio.Event()
539 class Mid(BaseMiddleware):
540 async def after_processed(self, *args: Any, **kwargs: Any):
541 event.set()
543 return await super().after_processed(*args, **kwargs)
545 pub_broker = self.get_broker(apply_types=True)
546 pub_broker.add_middleware(Mid)
548 args, kwargs = self.get_subscriber_params(queue + "reply")
550 @pub_broker.subscriber(*args, **kwargs)
551 async def reply_handler(m) -> None:
552 mock(m)
554 args2, kwargs2 = self.get_subscriber_params(queue, no_reply=True)
556 @pub_broker.subscriber(*args2, **kwargs2)
557 async def handler(m):
558 return m
560 async with self.patch_broker(pub_broker) as br:
561 await br.start()
563 await asyncio.wait(
564 (
565 asyncio.create_task(
566 br.publish("Hello!", queue, reply_to=queue + "reply"),
567 ),
568 asyncio.create_task(event.wait()),
569 ),
570 timeout=self.timeout,
571 )
573 assert not mock.called
575 @pytest.mark.asyncio()
576 async def test_publisher_after_connect(self, queue: str) -> None:
577 async with self.patch_broker(self.get_broker()) as br:
578 # Should pass without error
579 # suppress TestClient error due where is no suitable subscriber
580 with suppress(SubscriberNotFound):
581 await br.publisher(queue).publish(None)
583 @pytest.mark.asyncio()
584 async def test_publisher_after_start(
585 self,
586 queue: str,
587 mock: MagicMock,
588 ) -> None:
589 event = asyncio.Event()
591 pub_broker = self.get_broker(apply_types=True)
593 args, kwargs = self.get_subscriber_params(queue)
595 @pub_broker.subscriber(*args, **kwargs)
596 async def handler(m) -> None:
597 event.set()
598 mock(m)
600 async with self.patch_broker(pub_broker) as br:
601 await br.start()
603 pub = br.publisher(queue)
605 await asyncio.wait(
606 (
607 asyncio.create_task(pub.publish("Hello!")),
608 asyncio.create_task(event.wait()),
609 ),
610 timeout=self.timeout,
611 )
613 mock.assert_called_with("Hello!")