Coverage for tests / docs / confluent / publish_with_partition_key / test_app.py: 73%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import pytest 

2 

3from docs.docs_src.confluent.publish_with_partition_key.app import ( 

4 Data, 

5 broker, 

6 on_input_data, 

7 to_output_data, 

8) 

9from faststream.confluent import TestKafkaBroker 

10 

11 

12@pytest.mark.confluent() 

13@pytest.mark.asyncio() 

14async def test_app() -> None: 

15 async with TestKafkaBroker(broker): 

16 await broker.publish(Data(data=0.2), "input_data", key=b"my_key") 

17 

18 on_input_data.mock.assert_called_once_with(dict(Data(data=0.2))) 

19 to_output_data.mock.assert_called_once_with(dict(Data(data=1.2))) 

20 

21 

22@pytest.mark.confluent() 

23@pytest.mark.skip("we are not checking the key") 

24@pytest.mark.asyncio() 

25async def test_keys() -> None: 

26 async with TestKafkaBroker(broker): 

27 # we should be able to publish a message with the key 

28 await broker.publish(Data(data=0.2), "input_data", key=b"my_key") 

29 

30 # we need to check the key as well 

31 on_input_data.mock.assert_called_once_with(dict(Data(data=0.2)), key=b"my_key") 

32 to_output_data.mock.assert_called_once_with(dict(Data(data=1.2)), key=b"key")