Coverage for faststream / redis / subscriber / factory.py: 85%
37 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import warnings
2from typing import TYPE_CHECKING, Any, TypeAlias, Union
4from faststream._internal.constants import EMPTY
5from faststream._internal.endpoint.subscriber.call_item import CallsCollection
6from faststream.exceptions import SetupError
7from faststream.middlewares import AckPolicy
8from faststream.redis.schemas import INCORRECT_SETUP_MSG, ListSub, PubSub, StreamSub
9from faststream.redis.schemas.proto import validate_options
11from .config import RedisSubscriberConfig, RedisSubscriberSpecificationConfig
12from .specification import (
13 ChannelSubscriberSpecification,
14 ListSubscriberSpecification,
15 RedisSubscriberSpecification,
16 StreamSubscriberSpecification,
17)
18from .usecases import (
19 ChannelConcurrentSubscriber,
20 ChannelSubscriber,
21 ListBatchSubscriber,
22 ListConcurrentSubscriber,
23 ListSubscriber,
24 LogicSubscriber,
25 StreamBatchSubscriber,
26 StreamConcurrentSubscriber,
27 StreamSubscriber,
28)
30if TYPE_CHECKING:
31 from faststream.redis.configs import RedisBrokerConfig
32 from faststream.redis.parser import MessageFormat
34SubscriberType: TypeAlias = LogicSubscriber
37def create_subscriber(
38 *,
39 channel: Union["PubSub", str, None],
40 list: Union["ListSub", str, None],
41 stream: Union["StreamSub", str, None],
42 # Subscriber args
43 ack_policy: "AckPolicy",
44 config: "RedisBrokerConfig",
45 no_reply: bool = False,
46 message_format: type["MessageFormat"] | None,
47 # AsyncAPI args
48 title_: str | None = None,
49 description_: str | None = None,
50 include_in_schema: bool = True,
51 max_workers: int = 1,
52) -> SubscriberType:
53 _validate_input_for_misconfigure(
54 channel=channel,
55 list=list,
56 stream=stream,
57 ack_policy=ack_policy,
58 max_workers=max_workers,
59 message_format=message_format,
60 )
62 subscriber_config = RedisSubscriberConfig(
63 channel_sub=PubSub.validate(channel),
64 list_sub=ListSub.validate(list),
65 stream_sub=StreamSub.validate(stream),
66 no_reply=no_reply,
67 _outer_config=config,
68 _ack_policy=ack_policy,
69 _message_format=message_format,
70 )
72 specification_config = RedisSubscriberSpecificationConfig(
73 title_=title_,
74 description_=description_,
75 include_in_schema=include_in_schema,
76 )
78 calls = CallsCollection[Any]()
80 specification: RedisSubscriberSpecification
81 if subscriber_config.channel_sub:
82 specification = ChannelSubscriberSpecification(
83 config,
84 specification_config,
85 calls,
86 channel=subscriber_config.channel_sub,
87 )
89 subscriber_config._ack_policy = AckPolicy.MANUAL
91 if max_workers > 1:
92 return ChannelConcurrentSubscriber(
93 subscriber_config,
94 specification,
95 calls,
96 max_workers=max_workers,
97 )
99 return ChannelSubscriber(subscriber_config, specification, calls)
101 if subscriber_config.stream_sub:
102 specification = StreamSubscriberSpecification(
103 config,
104 specification_config,
105 calls,
106 stream_sub=subscriber_config.stream_sub,
107 )
109 if subscriber_config.stream_sub.batch:
110 # TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
111 return StreamBatchSubscriber(subscriber_config, specification, calls)
113 if max_workers > 1:
114 return StreamConcurrentSubscriber(
115 subscriber_config,
116 specification,
117 calls,
118 max_workers=max_workers,
119 )
121 return StreamSubscriber(subscriber_config, specification, calls)
123 if subscriber_config.list_sub: 123 ↛ 145line 123 didn't jump to line 145 because the condition on line 123 was always true
124 specification = ListSubscriberSpecification(
125 config,
126 specification_config,
127 calls,
128 list_sub=subscriber_config.list_sub,
129 )
131 if subscriber_config.list_sub.batch:
132 # TODO: raise warning if max_workers in `_validate_input_for_misconfigure`
133 return ListBatchSubscriber(subscriber_config, specification, calls)
135 if max_workers > 1:
136 return ListConcurrentSubscriber(
137 subscriber_config,
138 specification,
139 calls,
140 max_workers=max_workers,
141 )
143 return ListSubscriber(subscriber_config, specification, calls)
145 raise SetupError(INCORRECT_SETUP_MSG)
148def _validate_input_for_misconfigure(
149 *,
150 channel: Union["PubSub", str, None],
151 list: Union["ListSub", str, None],
152 stream: Union["StreamSub", str, None],
153 ack_policy: AckPolicy,
154 max_workers: int,
155 message_format: type["MessageFormat"] | None,
156) -> None:
157 validate_options(channel=channel, list=list, stream=stream)
159 if stream and ack_policy is AckPolicy.MANUAL and max_workers > 1: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 msg = "Max workers not work with manual no_ack mode."
161 raise SetupError(msg)
163 if ack_policy is not EMPTY:
164 if channel: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 warnings.warn(
166 "You can't use acknowledgement policy with PubSub subscriber.",
167 RuntimeWarning,
168 stacklevel=4,
169 )
171 if list: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 warnings.warn(
173 "You can't use acknowledgement policy with List subscriber.",
174 RuntimeWarning,
175 stacklevel=4,
176 )