Coverage for faststream / confluent / security.py: 95%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from faststream.exceptions import SetupError
4from faststream.security import (
5 SASLGSSAPI,
6 BaseSecurity,
7 SASLOAuthBearer,
8 SASLPlaintext,
9 SASLScram256,
10 SASLScram512,
11)
14def parse_security(security: BaseSecurity | None) -> dict[str, Any]:
15 if security is None:
16 return {}
18 if security.ssl_context:
19 msg = "ssl_context is not supported by confluent-kafka-python, please use config instead."
20 raise SetupError(msg)
22 if isinstance(security, SASLPlaintext):
23 return _parse_sasl_plaintext(security)
24 if isinstance(security, SASLScram256):
25 return _parse_sasl_scram256(security)
26 if isinstance(security, SASLScram512):
27 return _parse_sasl_scram512(security)
28 if isinstance(security, SASLOAuthBearer):
29 return _parse_sasl_oauthbearer(security)
30 if isinstance(security, SASLGSSAPI):
31 return _parse_sasl_gssapi(security)
32 if isinstance(security, BaseSecurity): 32 ↛ 35line 32 didn't jump to line 35 because the condition on line 32 was always true
33 return _parse_base_security(security)
35 msg = f"KafkaBroker does not support `{type(security)}`."
36 raise NotImplementedError(msg)
39def _parse_base_security(security: BaseSecurity) -> dict[str, Any]:
40 return {
41 "security.protocol": "ssl" if security.use_ssl else "plaintext",
42 }
45def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]:
46 return {
47 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext",
48 "sasl.mechanism": "PLAIN",
49 "sasl.username": security.username,
50 "sasl.password": security.password,
51 }
54def _parse_sasl_scram256(security: SASLScram256) -> dict[str, Any]:
55 return {
56 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext",
57 "sasl.mechanism": "SCRAM-SHA-256",
58 "sasl.username": security.username,
59 "sasl.password": security.password,
60 }
63def _parse_sasl_scram512(security: SASLScram512) -> dict[str, Any]:
64 return {
65 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext",
66 "sasl.mechanism": "SCRAM-SHA-512",
67 "sasl.username": security.username,
68 "sasl.password": security.password,
69 }
72def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> dict[str, Any]:
73 return {
74 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext",
75 "sasl.mechanism": "OAUTHBEARER",
76 }
79def _parse_sasl_gssapi(security: SASLGSSAPI) -> dict[str, Any]:
80 return {
81 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext",
82 "sasl.mechanism": "GSSAPI",
83 }