Coverage for faststream / kafka / security.py: 95%
25 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from faststream.security import (
4 SASLGSSAPI,
5 BaseSecurity,
6 SASLOAuthBearer,
7 SASLPlaintext,
8 SASLScram256,
9 SASLScram512,
10)
13def parse_security(security: BaseSecurity | None) -> dict[str, Any]:
14 if security is None:
15 return {}
16 if isinstance(security, SASLPlaintext):
17 return _parse_sasl_plaintext(security)
18 if isinstance(security, SASLScram256):
19 return _parse_sasl_scram256(security)
20 if isinstance(security, SASLScram512):
21 return _parse_sasl_scram512(security)
22 if isinstance(security, SASLOAuthBearer):
23 return _parse_sasl_oauthbearer(security)
24 if isinstance(security, SASLGSSAPI):
25 return _parse_sasl_gssapi(security)
26 if isinstance(security, BaseSecurity): 26 ↛ 28line 26 didn't jump to line 28 because the condition on line 26 was always true
27 return _parse_base_security(security)
28 msg = f"KafkaBroker does not support `{type(security)}`."
29 raise NotImplementedError(msg)
32def _parse_base_security(security: BaseSecurity) -> dict[str, Any]:
33 return {
34 "security_protocol": "SSL" if security.use_ssl else "PLAINTEXT",
35 "ssl_context": security.ssl_context,
36 }
39def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]:
40 return {
41 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
42 "ssl_context": security.ssl_context,
43 "sasl_mechanism": "PLAIN",
44 "sasl_plain_username": security.username,
45 "sasl_plain_password": security.password,
46 }
49def _parse_sasl_scram256(security: SASLScram256) -> dict[str, Any]:
50 return {
51 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
52 "ssl_context": security.ssl_context,
53 "sasl_mechanism": "SCRAM-SHA-256",
54 "sasl_plain_username": security.username,
55 "sasl_plain_password": security.password,
56 }
59def _parse_sasl_scram512(security: SASLScram512) -> dict[str, Any]:
60 return {
61 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
62 "ssl_context": security.ssl_context,
63 "sasl_mechanism": "SCRAM-SHA-512",
64 "sasl_plain_username": security.username,
65 "sasl_plain_password": security.password,
66 }
69def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> dict[str, Any]:
70 return {
71 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
72 "ssl_context": security.ssl_context,
73 "sasl_mechanism": "OAUTHBEARER",
74 }
77def _parse_sasl_gssapi(security: SASLGSSAPI) -> dict[str, Any]:
78 return {
79 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
80 "ssl_context": security.ssl_context,
81 "sasl_mechanism": "GSSAPI",
82 }