Coverage for tests / brokers / mqtt / test_consume.py: 97%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import MagicMock 

3 

4import pytest 

5 

6from tests.brokers.base.consume import BrokerRealConsumeTestcase 

7 

8from .basic import MQTTTestcaseConfig 

9 

10 

11@pytest.mark.connected() 

12@pytest.mark.mqtt() 

13@pytest.mark.asyncio() 

14class TestConsume(MQTTTestcaseConfig, BrokerRealConsumeTestcase): 

15 async def test_consume_with_filter(self, queue, mock): 

16 if self.version == "3.1.1": 

17 pytest.skip("content_type filtering not supported in MQTT 3.1.1") 

18 await super().test_consume_with_filter(queue, mock) 

19 

20 @pytest.mark.asyncio() 

21 async def test_iteration( 

22 self, 

23 queue: str, 

24 mock: MagicMock, 

25 ) -> None: 

26 # Overridden since order is not guaranteed in MQTT. 

27 expected_messages = {"test_message_1", "test_message_2"} 

28 

29 broker = self.get_broker(apply_types=True) 

30 

31 args, kwargs = self.get_subscriber_params(queue) 

32 subscriber = broker.subscriber(*args, **kwargs) 

33 

34 async with self.patch_broker(broker) as br: 

35 await br.start() 

36 

37 async def publish_test_message(): 

38 for msg in expected_messages: 

39 await br.publish(msg, queue) 

40 

41 async def consume(): 

42 index_message = 0 

43 async for msg in subscriber: 43 ↛ exitline 43 didn't return from function 'consume' because the loop on line 43 didn't complete

44 result_message = await msg.decode() 

45 

46 mock(result_message) 

47 

48 index_message += 1 

49 if index_message >= len(expected_messages): 

50 break 

51 

52 await asyncio.wait( 

53 ( 

54 asyncio.create_task(consume()), 

55 asyncio.create_task(publish_test_message()), 

56 ), 

57 timeout=self.timeout, 

58 ) 

59 

60 msgs = {call.args[0] for call in mock.mock_calls} 

61 assert msgs == expected_messages