Coverage for faststream / rabbit / security.py: 88%

11 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from faststream.security import BaseSecurity, SASLPlaintext 

4 

5 

6def parse_security(security: BaseSecurity | None) -> dict[str, Any]: 

7 """Convert security object to connection arguments.""" 

8 if security is None: 

9 return {} 

10 if isinstance(security, SASLPlaintext): 

11 return _parse_sasl_plaintext(security) 

12 if isinstance(security, BaseSecurity): 12 ↛ 14line 12 didn't jump to line 14 because the condition on line 12 was always true

13 return _parse_base_security(security) 

14 msg = f"RabbitBroker does not support {type(security)}" 

15 raise NotImplementedError(msg) 

16 

17 

18def _parse_base_security(security: BaseSecurity) -> dict[str, Any]: 

19 return { 

20 "ssl": security.use_ssl, 

21 "ssl_context": security.ssl_context, 

22 } 

23 

24 

25def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]: 

26 return { 

27 "ssl": security.use_ssl, 

28 "ssl_context": security.ssl_context, 

29 "login": security.username, 

30 "password": security.password, 

31 }