Coverage for tests / docs / confluent / publish_with_partition_key / test_app.py: 73%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import pytest
3from docs.docs_src.confluent.publish_with_partition_key.app import (
4 Data,
5 broker,
6 on_input_data,
7 to_output_data,
8)
9from faststream.confluent import TestKafkaBroker
12@pytest.mark.confluent()
13@pytest.mark.asyncio()
14async def test_app() -> None:
15 async with TestKafkaBroker(broker):
16 await broker.publish(Data(data=0.2), "input_data", key=b"my_key")
18 on_input_data.mock.assert_called_once_with(dict(Data(data=0.2)))
19 to_output_data.mock.assert_called_once_with(dict(Data(data=1.2)))
22@pytest.mark.confluent()
23@pytest.mark.skip("we are not checking the key")
24@pytest.mark.asyncio()
25async def test_keys() -> None:
26 async with TestKafkaBroker(broker):
27 # we should be able to publish a message with the key
28 await broker.publish(Data(data=0.2), "input_data", key=b"my_key")
30 # we need to check the key as well
31 on_input_data.mock.assert_called_once_with(dict(Data(data=0.2)), key=b"my_key")
32 to_output_data.mock.assert_called_once_with(dict(Data(data=1.2)), key=b"key")