Coverage for tests / brokers / base / parser.py: 99%
158 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import MagicMock
4import anyio
5import pytest
7from faststream.message.message import StreamMessage
9from .basic import BaseTestcaseConfig
12@pytest.mark.asyncio()
13class LocalCustomParserTestcase(BaseTestcaseConfig):
14 async def test_local_parser(
15 self,
16 mock: MagicMock,
17 event: asyncio.Event,
18 queue: str,
19 ) -> None:
20 broker = self.get_broker()
22 async def custom_parser(msg, original):
23 msg = await original(msg)
24 mock(msg.body)
25 return msg
27 args, kwargs = self.get_subscriber_params(queue, parser=custom_parser)
29 @broker.subscriber(*args, **kwargs)
30 async def handle(m) -> None:
31 event.set()
33 async with self.patch_broker(broker) as br:
34 await br.start()
36 await asyncio.wait(
37 (
38 asyncio.create_task(br.publish(b"hello", queue)),
39 asyncio.create_task(event.wait()),
40 ),
41 timeout=self.timeout,
42 )
44 assert event.is_set()
45 mock.assert_called_once_with(b"hello")
47 async def test_local_sync_decoder(
48 self,
49 mock: MagicMock,
50 event: asyncio.Event,
51 queue: str,
52 ) -> None:
53 broker = self.get_broker()
55 def custom_decoder(msg):
56 mock(msg.body)
57 return msg
59 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder)
61 @broker.subscriber(*args, **kwargs)
62 async def handle(m) -> None:
63 event.set()
65 async with self.patch_broker(broker) as br:
66 await br.start()
68 await asyncio.wait(
69 (
70 asyncio.create_task(br.publish(b"hello", queue)),
71 asyncio.create_task(event.wait()),
72 ),
73 timeout=self.timeout,
74 )
76 assert event.is_set()
77 mock.assert_called_once_with(b"hello")
79 async def test_global_sync_decoder(
80 self,
81 mock: MagicMock,
82 event: asyncio.Event,
83 queue: str,
84 ) -> None:
85 def custom_decoder(msg):
86 mock(msg.body)
87 return msg
89 broker = self.get_broker(decoder=custom_decoder)
91 args, kwargs = self.get_subscriber_params(queue)
93 @broker.subscriber(*args, **kwargs)
94 async def handle(m) -> None:
95 event.set()
97 async with self.patch_broker(broker) as br:
98 await br.start()
100 await asyncio.wait(
101 (
102 asyncio.create_task(br.publish(b"hello", queue)),
103 asyncio.create_task(event.wait()),
104 ),
105 timeout=self.timeout,
106 )
108 assert event.is_set()
109 mock.assert_called_once_with(b"hello")
111 async def test_local_parser_no_share_between_subscribers(
112 self,
113 mock: MagicMock,
114 queue: str,
115 ) -> None:
116 event, event2 = asyncio.Event(), asyncio.Event()
117 broker = self.get_broker()
119 async def custom_parser(msg, original):
120 msg = await original(msg)
121 mock(msg.body)
122 return msg
124 args, kwargs = self.get_subscriber_params(queue, parser=custom_parser)
125 args2, kwargs2 = self.get_subscriber_params(queue + "1")
127 @broker.subscriber(*args, **kwargs)
128 @broker.subscriber(*args2, **kwargs2)
129 async def handle(m) -> None:
130 if event.is_set():
131 event2.set()
132 else:
133 event.set()
135 async with self.patch_broker(broker) as br:
136 await br.start()
138 await asyncio.wait(
139 (
140 asyncio.create_task(br.publish(b"hello", queue)),
141 asyncio.create_task(br.publish(b"hello", queue + "1")),
142 asyncio.create_task(event.wait()),
143 asyncio.create_task(event2.wait()),
144 ),
145 timeout=self.timeout,
146 )
148 assert event.is_set()
149 assert event2.is_set()
150 mock.assert_called_once_with(b"hello")
152 async def test_local_parser_no_share_between_handlers(
153 self,
154 mock: MagicMock,
155 queue: str,
156 ) -> None:
157 event, event2 = asyncio.Event(), asyncio.Event()
159 broker = self.get_broker()
161 args, kwargs = self.get_subscriber_params(queue)
162 sub = broker.subscriber(*args, **kwargs)
164 @sub(filter=lambda m: m.content_type == "application/json")
165 async def handle(m) -> None:
166 event.set()
168 async def custom_parser(msg, original):
169 msg = await original(msg)
170 mock(msg.body)
171 return msg
173 @sub(parser=custom_parser)
174 async def handle2(m) -> None:
175 event2.set()
177 async with self.patch_broker(broker) as br:
178 await br.start()
180 await asyncio.wait(
181 (
182 asyncio.create_task(br.publish({"msg": "hello"}, queue)),
183 asyncio.create_task(br.publish(b"hello", queue)),
184 asyncio.create_task(event.wait()),
185 asyncio.create_task(event2.wait()),
186 ),
187 timeout=self.timeout,
188 )
190 assert event.is_set()
191 assert event2.is_set()
192 assert mock.call_count == 1
194 @pytest.mark.connected()
195 async def test_iterator_respect_decoder(
196 self,
197 mock: MagicMock,
198 queue: str,
199 ) -> None:
200 """Fixes https://github.com/ag2ai/faststream/issues/2554."""
201 start_event, consumed_event, stopped_event = (
202 asyncio.Event(),
203 asyncio.Event(),
204 asyncio.Event(),
205 )
207 broker = self.get_broker()
209 async def custom_decoder(msg, original):
210 mock()
211 consumed_event.set()
212 return await original(msg)
214 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder)
215 sub = broker.subscriber(*args, **kwargs)
217 async def iter_messages() -> None:
218 await sub.start()
219 start_event.set()
221 async for m in sub: 221 ↛ 227line 221 didn't jump to line 227 because the loop on line 221 didn't complete
222 assert not mock.called
223 msg = await m.decode()
224 mock.assert_called_once()
225 break
227 await sub.stop()
228 stopped_event.set()
229 return msg
231 async with broker:
232 t = asyncio.create_task(iter_messages())
234 with anyio.move_on_after(self.timeout):
235 await start_event.wait()
236 await broker.publish(b"hello", queue)
237 await consumed_event.wait()
238 await stopped_event.wait()
240 await t
241 data = t.result()
242 assert data == b"hello", data
244 @pytest.mark.connected()
245 async def test_get_one_respect_decoder(
246 self,
247 queue: str,
248 event: asyncio.Event,
249 mock: MagicMock,
250 ) -> None:
251 """Fixes https://github.com/ag2ai/faststream/issues/2554."""
252 broker = self.get_broker()
254 async def custom_decoder(msg, original):
255 mock()
256 return await original(msg)
258 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder)
259 sub = broker.subscriber(*args, **kwargs)
261 async def get_msg() -> StreamMessage:
262 await sub.start()
263 event.set()
264 msg = await sub.get_one(timeout=self.timeout)
265 await sub.stop()
266 return msg
268 async with broker:
269 task = asyncio.create_task(get_msg())
271 with anyio.move_on_after(self.timeout):
272 await event.wait()
273 await broker.publish(b"hello", queue)
275 await task
276 msg = task.result()
278 assert not mock.called, mock.call_count
279 await msg.decode()
280 mock.assert_called_once()
283class CustomParserTestcase(LocalCustomParserTestcase):
284 async def test_global_parser(
285 self,
286 mock: MagicMock,
287 event: asyncio.Event,
288 queue: str,
289 ) -> None:
290 async def custom_parser(msg, original):
291 msg = await original(msg)
292 mock(msg.body)
293 return msg
295 broker = self.get_broker(parser=custom_parser)
297 args, kwargs = self.get_subscriber_params(queue)
299 @broker.subscriber(*args, **kwargs)
300 async def handle(m) -> None:
301 event.set()
303 async with self.patch_broker(broker) as br:
304 await br.start()
306 await asyncio.wait(
307 (
308 asyncio.create_task(br.publish(b"hello", queue)),
309 asyncio.create_task(event.wait()),
310 ),
311 timeout=self.timeout,
312 )
314 assert event.is_set()
315 mock.assert_called_once_with(b"hello")