Coverage for tests / brokers / confluent / test_security.py: 33%

6 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from contextlib import contextmanager 

2from unittest.mock import AsyncMock, MagicMock, patch 

3 

4import pytest 

5 

6from faststream.exceptions import SetupError 

7 

8 

9@contextmanager 

10def patch_aio_consumer_and_producer() -> tuple[MagicMock, MagicMock]: 

11 try: 

12 producer = MagicMock(return_value=AsyncMock()) 

13 

14 with patch( 

15 "faststream.confluent.helpers.client.AsyncConfluentProducer", 

16 new=producer, 

17 ): 

18 yield producer 

19 finally: 

20 pass 

21 

22 

23@pytest.mark.confluent() 

24@pytest.mark.asyncio() 

25async def test_base_security_pass_ssl_context() -> None: 

26 import ssl 

27 

28 from faststream.confluent import KafkaBroker 

29 from faststream.security import BaseSecurity 

30 

31 ssl_context = ssl.create_default_context() 

32 security = BaseSecurity(ssl_context=ssl_context) 

33 

34 with pytest.raises( 

35 SetupError, 

36 match=r"ssl_context is not supported by confluent-kafka-python, please use config instead.", 

37 ): 

38 KafkaBroker("localhost:9092", security=security)