Coverage for faststream / kafka / security.py: 95%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from faststream.security import ( 

4 SASLGSSAPI, 

5 BaseSecurity, 

6 SASLOAuthBearer, 

7 SASLPlaintext, 

8 SASLScram256, 

9 SASLScram512, 

10) 

11 

12 

13def parse_security(security: BaseSecurity | None) -> dict[str, Any]: 

14 if security is None: 

15 return {} 

16 if isinstance(security, SASLPlaintext): 

17 return _parse_sasl_plaintext(security) 

18 if isinstance(security, SASLScram256): 

19 return _parse_sasl_scram256(security) 

20 if isinstance(security, SASLScram512): 

21 return _parse_sasl_scram512(security) 

22 if isinstance(security, SASLOAuthBearer): 

23 return _parse_sasl_oauthbearer(security) 

24 if isinstance(security, SASLGSSAPI): 

25 return _parse_sasl_gssapi(security) 

26 if isinstance(security, BaseSecurity): 26 ↛ 28line 26 didn't jump to line 28 because the condition on line 26 was always true

27 return _parse_base_security(security) 

28 msg = f"KafkaBroker does not support `{type(security)}`." 

29 raise NotImplementedError(msg) 

30 

31 

32def _parse_base_security(security: BaseSecurity) -> dict[str, Any]: 

33 return { 

34 "security_protocol": "SSL" if security.use_ssl else "PLAINTEXT", 

35 "ssl_context": security.ssl_context, 

36 } 

37 

38 

39def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]: 

40 return { 

41 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", 

42 "ssl_context": security.ssl_context, 

43 "sasl_mechanism": "PLAIN", 

44 "sasl_plain_username": security.username, 

45 "sasl_plain_password": security.password, 

46 } 

47 

48 

49def _parse_sasl_scram256(security: SASLScram256) -> dict[str, Any]: 

50 return { 

51 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", 

52 "ssl_context": security.ssl_context, 

53 "sasl_mechanism": "SCRAM-SHA-256", 

54 "sasl_plain_username": security.username, 

55 "sasl_plain_password": security.password, 

56 } 

57 

58 

59def _parse_sasl_scram512(security: SASLScram512) -> dict[str, Any]: 

60 return { 

61 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", 

62 "ssl_context": security.ssl_context, 

63 "sasl_mechanism": "SCRAM-SHA-512", 

64 "sasl_plain_username": security.username, 

65 "sasl_plain_password": security.password, 

66 } 

67 

68 

69def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> dict[str, Any]: 

70 return { 

71 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", 

72 "ssl_context": security.ssl_context, 

73 "sasl_mechanism": "OAUTHBEARER", 

74 } 

75 

76 

77def _parse_sasl_gssapi(security: SASLGSSAPI) -> dict[str, Any]: 

78 return { 

79 "security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT", 

80 "ssl_context": security.ssl_context, 

81 "sasl_mechanism": "GSSAPI", 

82 }