Coverage for faststream / rabbit / security.py: 88%
11 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from faststream.security import BaseSecurity, SASLPlaintext
6def parse_security(security: BaseSecurity | None) -> dict[str, Any]:
7 """Convert security object to connection arguments."""
8 if security is None:
9 return {}
10 if isinstance(security, SASLPlaintext):
11 return _parse_sasl_plaintext(security)
12 if isinstance(security, BaseSecurity): 12 ↛ 14line 12 didn't jump to line 14 because the condition on line 12 was always true
13 return _parse_base_security(security)
14 msg = f"RabbitBroker does not support {type(security)}"
15 raise NotImplementedError(msg)
18def _parse_base_security(security: BaseSecurity) -> dict[str, Any]:
19 return {
20 "ssl": security.use_ssl,
21 "ssl_context": security.ssl_context,
22 }
25def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]:
26 return {
27 "ssl": security.use_ssl,
28 "ssl_context": security.ssl_context,
29 "login": security.username,
30 "password": security.password,
31 }