Coverage for tests / brokers / mqtt / test_consume.py: 97%
30 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from unittest.mock import MagicMock
4import pytest
6from tests.brokers.base.consume import BrokerRealConsumeTestcase
8from .basic import MQTTTestcaseConfig
11@pytest.mark.connected()
12@pytest.mark.mqtt()
13@pytest.mark.asyncio()
14class TestConsume(MQTTTestcaseConfig, BrokerRealConsumeTestcase):
15 async def test_consume_with_filter(self, queue, mock):
16 if self.version == "3.1.1":
17 pytest.skip("content_type filtering not supported in MQTT 3.1.1")
18 await super().test_consume_with_filter(queue, mock)
20 @pytest.mark.asyncio()
21 async def test_iteration(
22 self,
23 queue: str,
24 mock: MagicMock,
25 ) -> None:
26 # Overridden since order is not guaranteed in MQTT.
27 expected_messages = {"test_message_1", "test_message_2"}
29 broker = self.get_broker(apply_types=True)
31 args, kwargs = self.get_subscriber_params(queue)
32 subscriber = broker.subscriber(*args, **kwargs)
34 async with self.patch_broker(broker) as br:
35 await br.start()
37 async def publish_test_message():
38 for msg in expected_messages:
39 await br.publish(msg, queue)
41 async def consume():
42 index_message = 0
43 async for msg in subscriber: 43 ↛ exitline 43 didn't return from function 'consume' because the loop on line 43 didn't complete
44 result_message = await msg.decode()
46 mock(result_message)
48 index_message += 1
49 if index_message >= len(expected_messages):
50 break
52 await asyncio.wait(
53 (
54 asyncio.create_task(consume()),
55 asyncio.create_task(publish_test_message()),
56 ),
57 timeout=self.timeout,
58 )
60 msgs = {call.args[0] for call in mock.mock_calls}
61 assert msgs == expected_messages