Coverage for tests / brokers / mqtt / test_testclient.py: 99%
81 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import pytest
3from faststream.mqtt.broker.broker import MQTTBroker
4from faststream.mqtt.testing import FakeProducer, TestMQTTBroker, mqtt_topic_matches
5from tests.brokers.base.testclient import BrokerTestclientTestcase
7from .basic import MQTTMemoryTestcaseConfig
9_SKIP_V311 = "not supported in MQTT 3.1.1"
12@pytest.mark.mqtt()
13@pytest.mark.asyncio()
14class TestTestclient(MQTTMemoryTestcaseConfig, BrokerTestclientTestcase):
15 def get_fake_producer_class(self) -> type:
16 return FakeProducer
18 async def test_consume_with_filter(self, queue, mock):
19 if self.version == "3.1.1":
20 pytest.skip(_SKIP_V311)
21 await super().test_consume_with_filter(queue, mock)
23 async def test_response(self, queue, mock):
24 if self.version == "3.1.1":
25 pytest.skip(_SKIP_V311)
26 await super().test_response(queue, mock)
28 async def test_reply_to(self, queue, mock):
29 if self.version == "3.1.1":
30 pytest.skip(_SKIP_V311)
31 await super().test_reply_to(queue, mock)
33 @pytest.mark.connected()
34 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
35 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
37 @pytest.mark.connected()
38 async def test_broker_with_real_doesnt_get_patched(self) -> None:
39 await super().test_broker_with_real_doesnt_get_patched()
41 @pytest.mark.connected()
42 async def test_broker_with_real_patches_publishers_and_subscribers(
43 self,
44 queue: str,
45 ) -> None:
46 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
49class TestTopicMatching:
50 """Unit tests for the MQTT wildcard matching helper."""
52 def test_exact_match(self) -> None:
53 assert mqtt_topic_matches("sensors/temp", "sensors/temp")
55 def test_exact_no_match(self) -> None:
56 assert not mqtt_topic_matches("sensors/temp", "sensors/humidity")
58 def test_single_level_wildcard(self) -> None:
59 assert mqtt_topic_matches("sensors/+/temp", "sensors/room1/temp")
60 assert not mqtt_topic_matches("sensors/+/temp", "sensors/room1/floor2/temp")
62 def test_multi_level_wildcard(self) -> None:
63 assert mqtt_topic_matches("sensors/#", "sensors/room1/temp")
64 assert mqtt_topic_matches("sensors/#", "sensors/room1/floor/temp")
65 assert mqtt_topic_matches("sensors/#", "sensors")
67 def test_root_hash(self) -> None:
68 assert mqtt_topic_matches("#", "anything/at/all")
70 def test_shared_subscription(self) -> None:
71 assert mqtt_topic_matches("$share/workers/sensors/#", "sensors/temp")
72 assert not mqtt_topic_matches("$share/workers/sensors/+", "sensors/a/b")
75@pytest.mark.mqtt()
76@pytest.mark.asyncio()
77class TestWildcardRouting:
78 """Verify that TestMQTTBroker routes wildcard topics correctly."""
80 async def test_plus_wildcard_routes(self, queue: str) -> None:
81 broker = MQTTBroker()
83 @broker.subscriber(f"{queue}/+/temp")
84 async def handler(temperature: float) -> None:
85 pass
87 async with TestMQTTBroker(broker) as br:
88 await br.start()
89 await br.publish("22.5", f"{queue}/room1/temp")
90 handler.mock.assert_called_once_with("22.5")
92 async def test_hash_wildcard_routes(self, queue: str) -> None:
93 broker = MQTTBroker()
95 @broker.subscriber(f"{queue}/#")
96 async def handler(msg: str) -> None:
97 pass
99 async with TestMQTTBroker(broker) as br:
100 await br.start()
101 await br.publish("data", f"{queue}/a/b/c")
102 handler.mock.assert_called_once_with("data")
104 async def test_no_match_skips_handler(self, queue: str) -> None:
105 broker = MQTTBroker()
107 @broker.subscriber(f"{queue}/exact")
108 async def handler(msg: str) -> None:
109 pass
111 async with TestMQTTBroker(broker) as br:
112 await br.start()
113 await br.publish("data", f"{queue}/other")
114 handler.mock.assert_not_called()
116 async def test_shared_subscription_routing(self, queue: str) -> None:
117 """Messages to a shared group topic should reach exactly one subscriber."""
118 broker = MQTTBroker()
120 call_count = 0
122 @broker.subscriber(f"{queue}/data", shared="workers")
123 async def handler(msg: str) -> None:
124 nonlocal call_count
125 call_count += 1
127 async with TestMQTTBroker(broker) as br:
128 await br.start()
129 await br.publish("hello", f"{queue}/data")
130 assert call_count == 1