Coverage for faststream / redis / security.py: 93%
19 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3from redis.asyncio.connection import Connection
5from faststream.security import BaseSecurity, SASLPlaintext
8def parse_security(security: BaseSecurity | None) -> dict[str, Any]:
9 if security is None:
10 return {}
12 if isinstance(security, SASLPlaintext):
13 return _parse_sasl_plaintext(security)
15 if isinstance(security, BaseSecurity): 15 ↛ 18line 15 didn't jump to line 18 because the condition on line 15 was always true
16 return _parse_base_security(security)
18 msg = f"RedisBroker does not support {type(security)}"
19 raise NotImplementedError(msg)
22def _parse_base_security(security: BaseSecurity) -> dict[str, Any]:
23 if security.use_ssl:
25 class SSLConnection(Connection):
26 def __init__(
27 self,
28 _security: BaseSecurity = security,
29 **kwargs: Any,
30 ) -> None:
31 self._security = _security
32 super().__init__(**kwargs)
34 def _connection_arguments(self) -> Any:
35 return {
36 **super()._connection_arguments(), # type: ignore[misc]
37 "ssl": self._security.ssl_context,
38 }
40 return {"connection_class": SSLConnection}
41 return {}
44def _parse_sasl_plaintext(security: SASLPlaintext) -> dict[str, Any]:
45 return {
46 **_parse_base_security(security),
47 "username": security.username,
48 "password": security.password,
49 }