Coverage for faststream / nats / security.py: 35%

11 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from faststream.security import BaseSecurity, SASLPlaintext 

4 

5 

6def parse_security(security: BaseSecurity | None) -> dict[str, Any]: 

7 if security is None: 7 ↛ 9line 7 didn't jump to line 9 because the condition on line 7 was always true

8 return {} 

9 if isinstance(security, SASLPlaintext): 

10 return _parse_sasl_plaintext(security) 

11 if isinstance(security, BaseSecurity): 

12 return _parse_base_security(security) 

13 msg = f"NatsBroker does not support {type(security)}" 

14 raise NotImplementedError(msg) 

15 

16 

17def _parse_base_security(security: BaseSecurity) -> dict[str, Any]: 

18 return { 

19 "tls": security.ssl_context, 

20 } 

21 

22 

23def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]: 

24 return { 

25 "tls": security.ssl_context, 

26 "user": security.username, 

27 "password": security.password, 

28 }