Coverage for faststream / mqtt / security.py: 52%
13 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from faststream.security import BaseSecurity, SASLPlaintext
6def parse_security(security: BaseSecurity | None) -> dict[str, Any]:
7 if security is None: 7 ↛ 9line 7 didn't jump to line 9 because the condition on line 7 was always true
8 return {}
9 if isinstance(security, SASLPlaintext):
10 return _parse_sasl_plaintext(security)
11 if isinstance(security, BaseSecurity):
12 return _parse_base_security(security)
13 msg = f"MQTTBroker does not support {type(security)}"
14 raise NotImplementedError(msg)
17def _parse_base_security(security: BaseSecurity) -> dict[str, Any]:
18 if security.use_ssl:
19 return {"tls": security.ssl_context or True}
20 return {"tls": False}
23def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]:
24 return {
25 **_parse_base_security(security),
26 "username": security.username,
27 "password": security.password,
28 }