Coverage for faststream / redis / subscriber / specification.py: 93%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from faststream._internal.endpoint.subscriber import SubscriberSpecification 

4from faststream.redis.configs import RedisBrokerConfig 

5from faststream.redis.schemas import ListSub, PubSub, StreamSub 

6from faststream.specification.asyncapi.utils import resolve_payloads 

7from faststream.specification.schema import Message, Operation, SubscriberSpec 

8from faststream.specification.schema.bindings import ChannelBinding, redis 

9 

10from .config import RedisSubscriberSpecificationConfig 

11 

12if TYPE_CHECKING: 

13 from faststream._internal.endpoint.subscriber.call_item import ( 

14 CallsCollection, 

15 ) 

16 

17 

18class RedisSubscriberSpecification( 

19 SubscriberSpecification[RedisBrokerConfig, RedisSubscriberSpecificationConfig], 

20): 

21 def get_schema(self) -> dict[str, SubscriberSpec]: 

22 payloads = self.get_payloads() 

23 

24 return { 

25 self.name: SubscriberSpec( 

26 description=self.description, 

27 operation=Operation( 

28 message=Message( 

29 title=f"{self.name}:Message", 

30 payload=resolve_payloads(payloads), 

31 ), 

32 bindings=None, 

33 ), 

34 bindings=ChannelBinding( 

35 redis=self.channel_binding, 

36 ), 

37 ), 

38 } 

39 

40 @property 

41 def channel_binding(self) -> redis.ChannelBinding: 

42 raise NotImplementedError 

43 

44 

45class ChannelSubscriberSpecification(RedisSubscriberSpecification): 

46 def __init__( 

47 self, 

48 _outer_config: "RedisBrokerConfig", 

49 specification_config: "RedisSubscriberSpecificationConfig", 

50 calls: "CallsCollection[Any]", 

51 channel: PubSub, 

52 ) -> None: 

53 super().__init__(_outer_config, specification_config, calls) 

54 self.channel = channel 

55 

56 @property 

57 def name(self) -> str: 

58 if self.config.title_: 

59 return self.config.title_ 

60 

61 return f"{self.channel_name}:{self.call_name}" 

62 

63 @property 

64 def channel_name(self) -> str: 

65 return f"{self._outer_config.prefix}{self.channel.name}" 

66 

67 @property 

68 def channel_binding(self) -> "redis.ChannelBinding": 

69 return redis.ChannelBinding( 

70 channel=self.channel_name, 

71 method="psubscribe" if self.channel.pattern else "subscribe", 

72 ) 

73 

74 

75class ListSubscriberSpecification(RedisSubscriberSpecification): 

76 def __init__( 

77 self, 

78 _outer_config: "RedisBrokerConfig", 

79 specification_config: "RedisSubscriberSpecificationConfig", 

80 calls: "CallsCollection[Any]", 

81 list_sub: ListSub, 

82 ) -> None: 

83 super().__init__(_outer_config, specification_config, calls) 

84 self.list_sub = list_sub 

85 

86 @property 

87 def name(self) -> str: 

88 if self.config.title_: 88 ↛ 89line 88 didn't jump to line 89 because the condition on line 88 was never true

89 return self.config.title_ 

90 

91 return f"{self.list_name}:{self.call_name}" 

92 

93 @property 

94 def list_name(self) -> str: 

95 return f"{self._outer_config.prefix}{self.list_sub.name}" 

96 

97 @property 

98 def channel_binding(self) -> "redis.ChannelBinding": 

99 return redis.ChannelBinding( 

100 channel=self.list_name, 

101 method="lpop", 

102 ) 

103 

104 

105class StreamSubscriberSpecification(RedisSubscriberSpecification): 

106 def __init__( 

107 self, 

108 _outer_config: "RedisBrokerConfig", 

109 specification_config: "RedisSubscriberSpecificationConfig", 

110 calls: "CallsCollection[Any]", 

111 stream_sub: StreamSub, 

112 ) -> None: 

113 super().__init__(_outer_config, specification_config, calls) 

114 self.stream_sub = stream_sub 

115 

116 @property 

117 def name(self) -> str: 

118 if self.config.title_: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 return self.config.title_ 

120 

121 return f"{self.stream_name}:{self.call_name}" 

122 

123 @property 

124 def stream_name(self) -> str: 

125 return f"{self._outer_config.prefix}{self.stream_sub.name}" 

126 

127 @property 

128 def channel_binding(self) -> "redis.ChannelBinding": 

129 return redis.ChannelBinding( 

130 channel=self.stream_name, 

131 group_name=self.stream_sub.group, 

132 consumer_name=self.stream_sub.consumer, 

133 method="xreadgroup" if self.stream_sub.group else "xread", 

134 )