Coverage for faststream / redis / subscriber / factory.py: 85%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import warnings 

2from typing import TYPE_CHECKING, Any, TypeAlias, Union 

3 

4from faststream._internal.constants import EMPTY 

5from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

6from faststream.exceptions import SetupError 

7from faststream.middlewares import AckPolicy 

8from faststream.redis.schemas import INCORRECT_SETUP_MSG, ListSub, PubSub, StreamSub 

9from faststream.redis.schemas.proto import validate_options 

10 

11from .config import RedisSubscriberConfig, RedisSubscriberSpecificationConfig 

12from .specification import ( 

13 ChannelSubscriberSpecification, 

14 ListSubscriberSpecification, 

15 RedisSubscriberSpecification, 

16 StreamSubscriberSpecification, 

17) 

18from .usecases import ( 

19 ChannelConcurrentSubscriber, 

20 ChannelSubscriber, 

21 ListBatchSubscriber, 

22 ListConcurrentSubscriber, 

23 ListSubscriber, 

24 LogicSubscriber, 

25 StreamBatchSubscriber, 

26 StreamConcurrentSubscriber, 

27 StreamSubscriber, 

28) 

29 

30if TYPE_CHECKING: 

31 from faststream.redis.configs import RedisBrokerConfig 

32 from faststream.redis.parser import MessageFormat 

33 

34SubscriberType: TypeAlias = LogicSubscriber 

35 

36 

37def create_subscriber( 

38 *, 

39 channel: Union["PubSub", str, None], 

40 list: Union["ListSub", str, None], 

41 stream: Union["StreamSub", str, None], 

42 # Subscriber args 

43 ack_policy: "AckPolicy", 

44 config: "RedisBrokerConfig", 

45 no_reply: bool = False, 

46 message_format: type["MessageFormat"] | None, 

47 # AsyncAPI args 

48 title_: str | None = None, 

49 description_: str | None = None, 

50 include_in_schema: bool = True, 

51 max_workers: int = 1, 

52) -> SubscriberType: 

53 _validate_input_for_misconfigure( 

54 channel=channel, 

55 list=list, 

56 stream=stream, 

57 ack_policy=ack_policy, 

58 max_workers=max_workers, 

59 message_format=message_format, 

60 ) 

61 

62 subscriber_config = RedisSubscriberConfig( 

63 channel_sub=PubSub.validate(channel), 

64 list_sub=ListSub.validate(list), 

65 stream_sub=StreamSub.validate(stream), 

66 no_reply=no_reply, 

67 _outer_config=config, 

68 _ack_policy=ack_policy, 

69 _message_format=message_format, 

70 ) 

71 

72 specification_config = RedisSubscriberSpecificationConfig( 

73 title_=title_, 

74 description_=description_, 

75 include_in_schema=include_in_schema, 

76 ) 

77 

78 calls = CallsCollection[Any]() 

79 

80 specification: RedisSubscriberSpecification 

81 if subscriber_config.channel_sub: 

82 specification = ChannelSubscriberSpecification( 

83 config, 

84 specification_config, 

85 calls, 

86 channel=subscriber_config.channel_sub, 

87 ) 

88 

89 subscriber_config._ack_policy = AckPolicy.MANUAL 

90 

91 if max_workers > 1: 

92 return ChannelConcurrentSubscriber( 

93 subscriber_config, 

94 specification, 

95 calls, 

96 max_workers=max_workers, 

97 ) 

98 

99 return ChannelSubscriber(subscriber_config, specification, calls) 

100 

101 if subscriber_config.stream_sub: 

102 specification = StreamSubscriberSpecification( 

103 config, 

104 specification_config, 

105 calls, 

106 stream_sub=subscriber_config.stream_sub, 

107 ) 

108 

109 if subscriber_config.stream_sub.batch: 

110 # TODO: raise warning if max_workers in `_validate_input_for_misconfigure` 

111 return StreamBatchSubscriber(subscriber_config, specification, calls) 

112 

113 if max_workers > 1: 

114 return StreamConcurrentSubscriber( 

115 subscriber_config, 

116 specification, 

117 calls, 

118 max_workers=max_workers, 

119 ) 

120 

121 return StreamSubscriber(subscriber_config, specification, calls) 

122 

123 if subscriber_config.list_sub: 123 ↛ 145line 123 didn't jump to line 145 because the condition on line 123 was always true

124 specification = ListSubscriberSpecification( 

125 config, 

126 specification_config, 

127 calls, 

128 list_sub=subscriber_config.list_sub, 

129 ) 

130 

131 if subscriber_config.list_sub.batch: 

132 # TODO: raise warning if max_workers in `_validate_input_for_misconfigure` 

133 return ListBatchSubscriber(subscriber_config, specification, calls) 

134 

135 if max_workers > 1: 

136 return ListConcurrentSubscriber( 

137 subscriber_config, 

138 specification, 

139 calls, 

140 max_workers=max_workers, 

141 ) 

142 

143 return ListSubscriber(subscriber_config, specification, calls) 

144 

145 raise SetupError(INCORRECT_SETUP_MSG) 

146 

147 

148def _validate_input_for_misconfigure( 

149 *, 

150 channel: Union["PubSub", str, None], 

151 list: Union["ListSub", str, None], 

152 stream: Union["StreamSub", str, None], 

153 ack_policy: AckPolicy, 

154 max_workers: int, 

155 message_format: type["MessageFormat"] | None, 

156) -> None: 

157 validate_options(channel=channel, list=list, stream=stream) 

158 

159 if stream and ack_policy is AckPolicy.MANUAL and max_workers > 1: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 msg = "Max workers not work with manual no_ack mode." 

161 raise SetupError(msg) 

162 

163 if ack_policy is not EMPTY: 

164 if channel: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 warnings.warn( 

166 "You can't use acknowledgement policy with PubSub subscriber.", 

167 RuntimeWarning, 

168 stacklevel=4, 

169 ) 

170 

171 if list: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 warnings.warn( 

173 "You can't use acknowledgement policy with List subscriber.", 

174 RuntimeWarning, 

175 stacklevel=4, 

176 )