Coverage for tests / brokers / base / codec.py: 98%
46 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from unittest.mock import MagicMock
3import pytest
5from faststream._internal.parser import DefaultCodec
6from tests.brokers.base.basic import BaseTestcaseConfig
9@pytest.mark.asyncio()
10class CodecTestcase(BaseTestcaseConfig):
11 async def test_codec_decode_called(
12 self,
13 mock: MagicMock,
14 queue: str,
15 ) -> None:
16 class TrackingCodec(DefaultCodec):
17 async def decode(self, msg):
18 mock()
19 return await super().decode(msg)
21 codec = TrackingCodec()
22 broker = self.get_broker()
24 args, kwargs = self.get_subscriber_params(queue, codec=codec)
26 @broker.subscriber(*args, **kwargs)
27 async def handle(m) -> None:
28 pass
30 async with self.patch_broker(broker) as br:
31 await br.publish(b"hello", queue)
33 mock.assert_called_once()
35 async def test_codec_not_set_uses_default(
36 self,
37 mock: MagicMock,
38 queue: str,
39 ) -> None:
40 broker = self.get_broker()
42 args, kwargs = self.get_subscriber_params(queue)
44 @broker.subscriber(*args, **kwargs)
45 async def handle(m) -> None:
46 mock(m)
48 async with self.patch_broker(broker) as br:
49 await br.publish({"key": "value"}, queue)
51 mock.assert_called_once_with({"key": "value"})
53 async def test_codec_and_decoder_conflict_raises(
54 self,
55 queue: str,
56 ) -> None:
57 broker = self.get_broker()
58 codec = DefaultCodec()
60 async def my_decoder(msg, original):
61 return await original(msg)
63 args, kwargs = self.get_subscriber_params(queue, codec=codec, decoder=my_decoder)
65 @broker.subscriber(*args, **kwargs)
66 async def handle(m) -> None:
67 pass # pragma: no cover
69 # ValueError raised inside _get_parser_and_decoder() during start(),
70 # which TestBroker.__aenter__ calls before yielding — hence it propagates
71 # from the "async with" expression rather than from the body.
72 with pytest.raises(ValueError, match="codec"):
73 async with self.patch_broker(broker):
74 pass # pragma: no cover
76 async def test_broker_level_codec(
77 self,
78 mock: MagicMock,
79 queue: str,
80 ) -> None:
81 class TrackingCodec(DefaultCodec):
82 async def decode(self, msg):
83 mock()
84 return await super().decode(msg)
86 broker = self.get_broker(codec=TrackingCodec())
88 args, kwargs = self.get_subscriber_params(queue)
90 @broker.subscriber(*args, **kwargs)
91 async def handle(m) -> None:
92 pass
94 async with self.patch_broker(broker) as br:
95 await br.publish(b"hello", queue)
97 mock.assert_called_once()