Coverage for faststream / redis / publisher / specification.py: 93%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from faststream._internal.endpoint.publisher import PublisherSpecification 

2from faststream.redis.configs import RedisBrokerConfig 

3from faststream.redis.schemas import ListSub, PubSub, StreamSub 

4from faststream.specification.asyncapi.utils import resolve_payloads 

5from faststream.specification.schema import Message, Operation, PublisherSpec 

6from faststream.specification.schema.bindings import ChannelBinding, redis 

7 

8from .config import RedisPublisherSpecificationConfig 

9 

10 

11class RedisPublisherSpecification( 

12 PublisherSpecification[RedisBrokerConfig, RedisPublisherSpecificationConfig], 

13): 

14 def get_schema(self) -> dict[str, PublisherSpec]: 

15 payloads = self.get_payloads() 

16 

17 return { 

18 self.name: PublisherSpec( 

19 description=self.config.description_, 

20 operation=Operation( 

21 message=Message( 

22 title=f"{self.name}:Message", 

23 payload=resolve_payloads(payloads, "Publisher"), 

24 ), 

25 bindings=None, 

26 ), 

27 bindings=ChannelBinding( 

28 redis=self.channel_binding, 

29 ), 

30 ), 

31 } 

32 

33 @property 

34 def channel_binding(self) -> redis.ChannelBinding: 

35 raise NotImplementedError 

36 

37 

38class ChannelPublisherSpecification(RedisPublisherSpecification): 

39 def __init__( 

40 self, 

41 _outer_config: RedisBrokerConfig, 

42 specification_config: RedisPublisherSpecificationConfig, 

43 channel: PubSub, 

44 ) -> None: 

45 super().__init__(_outer_config, specification_config) 

46 self.channel = channel 

47 

48 @property 

49 def name(self) -> str: 

50 if self.config.title_: 

51 return self.config.title_ 

52 

53 return f"{self.channel_name}:Publisher" 

54 

55 @property 

56 def channel_name(self) -> str: 

57 return f"{self._outer_config.prefix}{self.channel.name}" 

58 

59 @property 

60 def channel_binding(self) -> redis.ChannelBinding: 

61 return redis.ChannelBinding( 

62 channel=self.channel_name, 

63 method="publish", 

64 ) 

65 

66 

67class ListPublisherSpecification(RedisPublisherSpecification): 

68 def __init__( 

69 self, 

70 _outer_config: RedisBrokerConfig, 

71 specification_config: RedisPublisherSpecificationConfig, 

72 list_sub: ListSub, 

73 ) -> None: 

74 super().__init__(_outer_config, specification_config) 

75 self.list_sub = list_sub 

76 

77 @property 

78 def name(self) -> str: 

79 if self.config.title_: 79 ↛ 80line 79 didn't jump to line 80 because the condition on line 79 was never true

80 return self.config.title_ 

81 

82 return f"{self.list_name}:Publisher" 

83 

84 @property 

85 def list_name(self) -> str: 

86 return f"{self._outer_config.prefix}{self.list_sub.name}" 

87 

88 @property 

89 def channel_binding(self) -> redis.ChannelBinding: 

90 return redis.ChannelBinding( 

91 channel=self.list_name, 

92 method="rpush", 

93 ) 

94 

95 

96class StreamPublisherSpecification(RedisPublisherSpecification): 

97 def __init__( 

98 self, 

99 _outer_config: RedisBrokerConfig, 

100 specification_config: RedisPublisherSpecificationConfig, 

101 stream_sub: StreamSub, 

102 ) -> None: 

103 super().__init__(_outer_config, specification_config) 

104 self.stream_sub = stream_sub 

105 

106 @property 

107 def name(self) -> str: 

108 if self.config.title_: 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 return self.config.title_ 

110 

111 return f"{self.stream_name}:Publisher" 

112 

113 @property 

114 def stream_name(self) -> str: 

115 return f"{self._outer_config.prefix}{self.stream_sub.name}" 

116 

117 @property 

118 def channel_binding(self) -> "redis.ChannelBinding": 

119 return redis.ChannelBinding( 

120 channel=self.stream_name, 

121 method="xadd", 

122 )