Coverage for tests / brokers / mqtt / test_testclient.py: 99%

81 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import pytest 

2 

3from faststream.mqtt.broker.broker import MQTTBroker 

4from faststream.mqtt.testing import FakeProducer, TestMQTTBroker, mqtt_topic_matches 

5from tests.brokers.base.testclient import BrokerTestclientTestcase 

6 

7from .basic import MQTTMemoryTestcaseConfig 

8 

9_SKIP_V311 = "not supported in MQTT 3.1.1" 

10 

11 

12@pytest.mark.mqtt() 

13@pytest.mark.asyncio() 

14class TestTestclient(MQTTMemoryTestcaseConfig, BrokerTestclientTestcase): 

15 def get_fake_producer_class(self) -> type: 

16 return FakeProducer 

17 

18 async def test_consume_with_filter(self, queue, mock): 

19 if self.version == "3.1.1": 

20 pytest.skip(_SKIP_V311) 

21 await super().test_consume_with_filter(queue, mock) 

22 

23 async def test_response(self, queue, mock): 

24 if self.version == "3.1.1": 

25 pytest.skip(_SKIP_V311) 

26 await super().test_response(queue, mock) 

27 

28 async def test_reply_to(self, queue, mock): 

29 if self.version == "3.1.1": 

30 pytest.skip(_SKIP_V311) 

31 await super().test_reply_to(queue, mock) 

32 

33 @pytest.mark.connected() 

34 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

35 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

36 

37 @pytest.mark.connected() 

38 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

39 await super().test_broker_with_real_doesnt_get_patched() 

40 

41 @pytest.mark.connected() 

42 async def test_broker_with_real_patches_publishers_and_subscribers( 

43 self, 

44 queue: str, 

45 ) -> None: 

46 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

47 

48 

49class TestTopicMatching: 

50 """Unit tests for the MQTT wildcard matching helper.""" 

51 

52 def test_exact_match(self) -> None: 

53 assert mqtt_topic_matches("sensors/temp", "sensors/temp") 

54 

55 def test_exact_no_match(self) -> None: 

56 assert not mqtt_topic_matches("sensors/temp", "sensors/humidity") 

57 

58 def test_single_level_wildcard(self) -> None: 

59 assert mqtt_topic_matches("sensors/+/temp", "sensors/room1/temp") 

60 assert not mqtt_topic_matches("sensors/+/temp", "sensors/room1/floor2/temp") 

61 

62 def test_multi_level_wildcard(self) -> None: 

63 assert mqtt_topic_matches("sensors/#", "sensors/room1/temp") 

64 assert mqtt_topic_matches("sensors/#", "sensors/room1/floor/temp") 

65 assert mqtt_topic_matches("sensors/#", "sensors") 

66 

67 def test_root_hash(self) -> None: 

68 assert mqtt_topic_matches("#", "anything/at/all") 

69 

70 def test_shared_subscription(self) -> None: 

71 assert mqtt_topic_matches("$share/workers/sensors/#", "sensors/temp") 

72 assert not mqtt_topic_matches("$share/workers/sensors/+", "sensors/a/b") 

73 

74 

75@pytest.mark.mqtt() 

76@pytest.mark.asyncio() 

77class TestWildcardRouting: 

78 """Verify that TestMQTTBroker routes wildcard topics correctly.""" 

79 

80 async def test_plus_wildcard_routes(self, queue: str) -> None: 

81 broker = MQTTBroker() 

82 

83 @broker.subscriber(f"{queue}/+/temp") 

84 async def handler(temperature: float) -> None: 

85 pass 

86 

87 async with TestMQTTBroker(broker) as br: 

88 await br.start() 

89 await br.publish("22.5", f"{queue}/room1/temp") 

90 handler.mock.assert_called_once_with("22.5") 

91 

92 async def test_hash_wildcard_routes(self, queue: str) -> None: 

93 broker = MQTTBroker() 

94 

95 @broker.subscriber(f"{queue}/#") 

96 async def handler(msg: str) -> None: 

97 pass 

98 

99 async with TestMQTTBroker(broker) as br: 

100 await br.start() 

101 await br.publish("data", f"{queue}/a/b/c") 

102 handler.mock.assert_called_once_with("data") 

103 

104 async def test_no_match_skips_handler(self, queue: str) -> None: 

105 broker = MQTTBroker() 

106 

107 @broker.subscriber(f"{queue}/exact") 

108 async def handler(msg: str) -> None: 

109 pass 

110 

111 async with TestMQTTBroker(broker) as br: 

112 await br.start() 

113 await br.publish("data", f"{queue}/other") 

114 handler.mock.assert_not_called() 

115 

116 async def test_shared_subscription_routing(self, queue: str) -> None: 

117 """Messages to a shared group topic should reach exactly one subscriber.""" 

118 broker = MQTTBroker() 

119 

120 call_count = 0 

121 

122 @broker.subscriber(f"{queue}/data", shared="workers") 

123 async def handler(msg: str) -> None: 

124 nonlocal call_count 

125 call_count += 1 

126 

127 async with TestMQTTBroker(broker) as br: 

128 await br.start() 

129 await br.publish("hello", f"{queue}/data") 

130 assert call_count == 1