Coverage for faststream / confluent / security.py: 95%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from faststream.exceptions import SetupError 

4from faststream.security import ( 

5 SASLGSSAPI, 

6 BaseSecurity, 

7 SASLOAuthBearer, 

8 SASLPlaintext, 

9 SASLScram256, 

10 SASLScram512, 

11) 

12 

13 

14def parse_security(security: BaseSecurity | None) -> dict[str, Any]: 

15 if security is None: 

16 return {} 

17 

18 if security.ssl_context: 

19 msg = "ssl_context is not supported by confluent-kafka-python, please use config instead." 

20 raise SetupError(msg) 

21 

22 if isinstance(security, SASLPlaintext): 

23 return _parse_sasl_plaintext(security) 

24 if isinstance(security, SASLScram256): 

25 return _parse_sasl_scram256(security) 

26 if isinstance(security, SASLScram512): 

27 return _parse_sasl_scram512(security) 

28 if isinstance(security, SASLOAuthBearer): 

29 return _parse_sasl_oauthbearer(security) 

30 if isinstance(security, SASLGSSAPI): 

31 return _parse_sasl_gssapi(security) 

32 if isinstance(security, BaseSecurity): 32 ↛ 35line 32 didn't jump to line 35 because the condition on line 32 was always true

33 return _parse_base_security(security) 

34 

35 msg = f"KafkaBroker does not support `{type(security)}`." 

36 raise NotImplementedError(msg) 

37 

38 

39def _parse_base_security(security: BaseSecurity) -> dict[str, Any]: 

40 return { 

41 "security.protocol": "ssl" if security.use_ssl else "plaintext", 

42 } 

43 

44 

45def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]: 

46 return { 

47 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext", 

48 "sasl.mechanism": "PLAIN", 

49 "sasl.username": security.username, 

50 "sasl.password": security.password, 

51 } 

52 

53 

54def _parse_sasl_scram256(security: SASLScram256) -> dict[str, Any]: 

55 return { 

56 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext", 

57 "sasl.mechanism": "SCRAM-SHA-256", 

58 "sasl.username": security.username, 

59 "sasl.password": security.password, 

60 } 

61 

62 

63def _parse_sasl_scram512(security: SASLScram512) -> dict[str, Any]: 

64 return { 

65 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext", 

66 "sasl.mechanism": "SCRAM-SHA-512", 

67 "sasl.username": security.username, 

68 "sasl.password": security.password, 

69 } 

70 

71 

72def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> dict[str, Any]: 

73 return { 

74 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext", 

75 "sasl.mechanism": "OAUTHBEARER", 

76 } 

77 

78 

79def _parse_sasl_gssapi(security: SASLGSSAPI) -> dict[str, Any]: 

80 return { 

81 "security.protocol": "sasl_ssl" if security.use_ssl else "sasl_plaintext", 

82 "sasl.mechanism": "GSSAPI", 

83 }