Coverage for faststream / redis / security.py: 93%

19 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3from redis.asyncio.connection import Connection 

4 

5from faststream.security import BaseSecurity, SASLPlaintext 

6 

7 

8def parse_security(security: BaseSecurity | None) -> dict[str, Any]: 

9 if security is None: 

10 return {} 

11 

12 if isinstance(security, SASLPlaintext): 

13 return _parse_sasl_plaintext(security) 

14 

15 if isinstance(security, BaseSecurity): 15 ↛ 18line 15 didn't jump to line 18 because the condition on line 15 was always true

16 return _parse_base_security(security) 

17 

18 msg = f"RedisBroker does not support {type(security)}" 

19 raise NotImplementedError(msg) 

20 

21 

22def _parse_base_security(security: BaseSecurity) -> dict[str, Any]: 

23 if security.use_ssl: 

24 

25 class SSLConnection(Connection): 

26 def __init__( 

27 self, 

28 _security: BaseSecurity = security, 

29 **kwargs: Any, 

30 ) -> None: 

31 self._security = _security 

32 super().__init__(**kwargs) 

33 

34 def _connection_arguments(self) -> Any: 

35 return { 

36 **super()._connection_arguments(), # type: ignore[misc] 

37 "ssl": self._security.ssl_context, 

38 } 

39 

40 return {"connection_class": SSLConnection} 

41 return {} 

42 

43 

44def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]: 

45 return { 

46 **_parse_base_security(security), 

47 "username": security.username, 

48 "password": security.password, 

49 }