Coverage for tests / brokers / confluent / test_security.py: 33%
6 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from contextlib import contextmanager
2from unittest.mock import AsyncMock, MagicMock, patch
4import pytest
6from faststream.exceptions import SetupError
9@contextmanager
10def patch_aio_consumer_and_producer() -> tuple[MagicMock, MagicMock]:
11 try:
12 producer = MagicMock(return_value=AsyncMock())
14 with patch(
15 "faststream.confluent.helpers.client.AsyncConfluentProducer",
16 new=producer,
17 ):
18 yield producer
19 finally:
20 pass
23@pytest.mark.confluent()
24@pytest.mark.asyncio()
25async def test_base_security_pass_ssl_context() -> None:
26 import ssl
28 from faststream.confluent import KafkaBroker
29 from faststream.security import BaseSecurity
31 ssl_context = ssl.create_default_context()
32 security = BaseSecurity(ssl_context=ssl_context)
34 with pytest.raises(
35 SetupError,
36 match=r"ssl_context is not supported by confluent-kafka-python, please use config instead.",
37 ):
38 KafkaBroker("localhost:9092", security=security)