Coverage for tests / brokers / base / codec.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from unittest.mock import MagicMock 

2 

3import pytest 

4 

5from faststream._internal.parser import DefaultCodec 

6from tests.brokers.base.basic import BaseTestcaseConfig 

7 

8 

9@pytest.mark.asyncio() 

10class CodecTestcase(BaseTestcaseConfig): 

11 async def test_codec_decode_called( 

12 self, 

13 mock: MagicMock, 

14 queue: str, 

15 ) -> None: 

16 class TrackingCodec(DefaultCodec): 

17 async def decode(self, msg): 

18 mock() 

19 return await super().decode(msg) 

20 

21 codec = TrackingCodec() 

22 broker = self.get_broker() 

23 

24 args, kwargs = self.get_subscriber_params(queue, codec=codec) 

25 

26 @broker.subscriber(*args, **kwargs) 

27 async def handle(m) -> None: 

28 pass 

29 

30 async with self.patch_broker(broker) as br: 

31 await br.publish(b"hello", queue) 

32 

33 mock.assert_called_once() 

34 

35 async def test_codec_not_set_uses_default( 

36 self, 

37 mock: MagicMock, 

38 queue: str, 

39 ) -> None: 

40 broker = self.get_broker() 

41 

42 args, kwargs = self.get_subscriber_params(queue) 

43 

44 @broker.subscriber(*args, **kwargs) 

45 async def handle(m) -> None: 

46 mock(m) 

47 

48 async with self.patch_broker(broker) as br: 

49 await br.publish({"key": "value"}, queue) 

50 

51 mock.assert_called_once_with({"key": "value"}) 

52 

53 async def test_codec_and_decoder_conflict_raises( 

54 self, 

55 queue: str, 

56 ) -> None: 

57 broker = self.get_broker() 

58 codec = DefaultCodec() 

59 

60 async def my_decoder(msg, original): 

61 return await original(msg) 

62 

63 args, kwargs = self.get_subscriber_params(queue, codec=codec, decoder=my_decoder) 

64 

65 @broker.subscriber(*args, **kwargs) 

66 async def handle(m) -> None: 

67 pass # pragma: no cover 

68 

69 # ValueError raised inside _get_parser_and_decoder() during start(), 

70 # which TestBroker.__aenter__ calls before yielding — hence it propagates 

71 # from the "async with" expression rather than from the body. 

72 with pytest.raises(ValueError, match="codec"): 

73 async with self.patch_broker(broker): 

74 pass # pragma: no cover 

75 

76 async def test_broker_level_codec( 

77 self, 

78 mock: MagicMock, 

79 queue: str, 

80 ) -> None: 

81 class TrackingCodec(DefaultCodec): 

82 async def decode(self, msg): 

83 mock() 

84 return await super().decode(msg) 

85 

86 broker = self.get_broker(codec=TrackingCodec()) 

87 

88 args, kwargs = self.get_subscriber_params(queue) 

89 

90 @broker.subscriber(*args, **kwargs) 

91 async def handle(m) -> None: 

92 pass 

93 

94 async with self.patch_broker(broker) as br: 

95 await br.publish(b"hello", queue) 

96 

97 mock.assert_called_once()