Coverage for faststream / nats / subscriber / factory.py: 67%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import warnings
2from typing import TYPE_CHECKING, Any, Optional, TypedDict
4from nats.aio.subscription import (
5 DEFAULT_SUB_PENDING_BYTES_LIMIT,
6 DEFAULT_SUB_PENDING_MSGS_LIMIT,
7)
8from nats.js.api import ConsumerConfig, DeliverPolicy
9from nats.js.client import (
10 DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
11 DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
12)
14from faststream._internal.constants import EMPTY
15from faststream._internal.endpoint.subscriber import SubscriberSpecification
16from faststream._internal.endpoint.subscriber.call_item import CallsCollection
17from faststream.exceptions import SetupError
18from faststream.middlewares import AckPolicy
20from .config import NatsSubscriberConfig, NatsSubscriberSpecificationConfig
21from .specification import NatsSubscriberSpecification, NotIncludeSpecifation
22from .usecases import (
23 BatchPullStreamSubscriber,
24 ConcurrentCoreSubscriber,
25 ConcurrentPullStreamSubscriber,
26 ConcurrentPushStreamSubscriber,
27 CoreSubscriber,
28 KeyValueWatchSubscriber,
29 LogicSubscriber,
30 ObjStoreWatchSubscriber,
31 PullStreamSubscriber,
32 PushStreamSubscriber,
33)
35if TYPE_CHECKING:
36 from nats.js import api
38 from faststream.nats.configs import NatsBrokerConfig
39 from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub
42class SharedOptions(TypedDict):
43 config: NatsSubscriberConfig
44 specification: SubscriberSpecification[Any, Any]
45 calls: CallsCollection[Any]
48def create_subscriber(
49 *,
50 subject: str,
51 queue: str,
52 pending_msgs_limit: int | None,
53 pending_bytes_limit: int | None,
54 # Core args
55 max_msgs: int,
56 # JS args
57 durable: str | None,
58 config: Optional["api.ConsumerConfig"],
59 ordered_consumer: bool,
60 idle_heartbeat: float | None,
61 flow_control: bool | None,
62 deliver_policy: Optional["api.DeliverPolicy"],
63 headers_only: bool | None,
64 # pull args
65 pull_sub: Optional["PullSub"],
66 kv_watch: Optional["KvWatch"],
67 obj_watch: Optional["ObjWatch"],
68 inbox_prefix: bytes,
69 # custom args
70 max_workers: int,
71 stream: Optional["JStream"],
72 # Subscriber args
73 ack_policy: "AckPolicy",
74 no_reply: bool,
75 broker_config: "NatsBrokerConfig",
76 # Specification information
77 title_: str | None,
78 description_: str | None,
79 include_in_schema: bool,
80) -> "LogicSubscriber[Any]":
81 _validate_input_for_misconfigure(
82 subject=subject,
83 queue=queue,
84 pending_msgs_limit=pending_msgs_limit,
85 pending_bytes_limit=pending_bytes_limit,
86 max_msgs=max_msgs,
87 durable=durable,
88 config=config,
89 ordered_consumer=ordered_consumer,
90 idle_heartbeat=idle_heartbeat,
91 flow_control=flow_control,
92 deliver_policy=deliver_policy,
93 headers_only=headers_only,
94 pull_sub=pull_sub,
95 ack_policy=ack_policy,
96 kv_watch=kv_watch,
97 obj_watch=obj_watch,
98 max_workers=max_workers,
99 stream=stream,
100 )
102 config = config or ConsumerConfig(filter_subjects=[])
103 if config.durable_name is None: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true
104 config.durable_name = durable
105 if config.idle_heartbeat is None: 105 ↛ 107line 105 didn't jump to line 107 because the condition on line 105 was always true
106 config.idle_heartbeat = idle_heartbeat
107 if config.headers_only is None: 107 ↛ 109line 107 didn't jump to line 109 because the condition on line 107 was always true
108 config.headers_only = headers_only
109 if config.deliver_policy is DeliverPolicy.ALL: 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true
110 config.deliver_policy = deliver_policy or DeliverPolicy.ALL
112 if stream:
113 # Both JS Subscribers
114 extra_options: dict[str, Any] = {
115 "pending_msgs_limit": pending_msgs_limit or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
116 "pending_bytes_limit": pending_bytes_limit
117 or DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
118 "durable": durable,
119 "stream": stream.name,
120 }
122 if pull_sub is not None:
123 # JS Pull Subscriber
124 extra_options.update({"inbox_prefix": inbox_prefix})
126 else:
127 # JS Push Subscriber
128 if ack_policy is AckPolicy.ACK_FIRST: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 manual_ack = False
130 ack_policy = AckPolicy.MANUAL
131 else:
132 manual_ack = True
134 extra_options.update(
135 {
136 "ordered_consumer": ordered_consumer,
137 "idle_heartbeat": idle_heartbeat,
138 "flow_control": flow_control,
139 "deliver_policy": deliver_policy,
140 "headers_only": headers_only,
141 "manual_ack": manual_ack,
142 },
143 )
145 else:
146 # Core Subscriber
147 extra_options = {
148 "pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT,
149 "pending_bytes_limit": pending_bytes_limit or DEFAULT_SUB_PENDING_BYTES_LIMIT,
150 "max_msgs": max_msgs,
151 }
153 subscriber_config = NatsSubscriberConfig(
154 subject=subject,
155 sub_config=config,
156 extra_options=extra_options,
157 no_reply=no_reply,
158 _outer_config=broker_config,
159 _ack_policy=ack_policy,
160 )
162 calls = CallsCollection[Any]()
164 specification_config = NatsSubscriberSpecificationConfig(
165 subject=subject,
166 queue=queue or None,
167 title_=title_,
168 description_=description_,
169 include_in_schema=include_in_schema,
170 )
172 specification = NatsSubscriberSpecification(
173 _outer_config=broker_config,
174 calls=calls,
175 specification_config=specification_config,
176 )
178 not_include_spec = NotIncludeSpecifation(
179 _outer_config=broker_config,
180 calls=calls,
181 specification_config=specification_config,
182 )
184 subscriber_options: SharedOptions = {
185 "config": subscriber_config,
186 "specification": specification,
187 "calls": calls,
188 }
190 if obj_watch is not None:
191 return ObjStoreWatchSubscriber(
192 **(subscriber_options | {"specification": not_include_spec}),
193 obj_watch=obj_watch,
194 )
196 if kv_watch is not None:
197 return KeyValueWatchSubscriber(
198 **(subscriber_options | {"specification": not_include_spec}),
199 kv_watch=kv_watch,
200 )
202 if stream is None:
203 if max_workers > 1:
204 return ConcurrentCoreSubscriber(
205 **subscriber_options,
206 max_workers=max_workers,
207 queue=queue,
208 )
210 return CoreSubscriber(
211 **subscriber_options,
212 queue=queue,
213 )
215 if max_workers > 1: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 if pull_sub is not None:
217 return ConcurrentPullStreamSubscriber(
218 **subscriber_options,
219 max_workers=max_workers,
220 queue=queue,
221 stream=stream,
222 pull_sub=pull_sub,
223 )
225 return ConcurrentPushStreamSubscriber(
226 **subscriber_options,
227 max_workers=max_workers,
228 queue=queue,
229 stream=stream,
230 )
232 if pull_sub is not None:
233 if pull_sub.batch:
234 return BatchPullStreamSubscriber(
235 **subscriber_options,
236 pull_sub=pull_sub,
237 stream=stream,
238 )
240 return PullStreamSubscriber(
241 **subscriber_options,
242 queue=queue,
243 pull_sub=pull_sub,
244 stream=stream,
245 )
247 return PushStreamSubscriber(
248 **subscriber_options,
249 queue=queue,
250 stream=stream,
251 )
254def _validate_input_for_misconfigure( # noqa: PLR0915
255 subject: str,
256 queue: str, # default ""
257 pending_msgs_limit: int | None,
258 pending_bytes_limit: int | None,
259 max_msgs: int, # default 0
260 durable: str | None,
261 config: Optional["api.ConsumerConfig"],
262 ordered_consumer: bool, # default False
263 idle_heartbeat: float | None,
264 flow_control: bool | None,
265 deliver_policy: Optional["api.DeliverPolicy"],
266 headers_only: bool | None,
267 pull_sub: Optional["PullSub"],
268 kv_watch: Optional["KvWatch"],
269 obj_watch: Optional["ObjWatch"],
270 ack_policy: "AckPolicy", # default EMPTY
271 max_workers: int, # default 1
272 stream: Optional["JStream"],
273) -> None:
274 if ack_policy is not EMPTY:
275 if obj_watch is not None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true
276 warnings.warn(
277 "You can't use acknowledgement policy with ObjectStorage watch subscriber.",
278 RuntimeWarning,
279 stacklevel=4,
280 )
282 elif kv_watch is not None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 warnings.warn(
284 "You can't use acknowledgement policy with KeyValue watch subscriber.",
285 RuntimeWarning,
286 stacklevel=4,
287 )
289 elif stream is None and ack_policy is not AckPolicy.MANUAL: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 warnings.warn(
291 (
292 "Core subscriber supports only `ack_policy=AckPolicy.MANUAL` option for very specific cases. "
293 "If you are using different option, probably, you should use JetStream Subscriber instead."
294 ),
295 RuntimeWarning,
296 stacklevel=4,
297 )
299 if max_msgs > 0 and any((stream, kv_watch, obj_watch)): 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true
300 warnings.warn(
301 "The `max_msgs` option can be used only with a NATS Core Subscriber.",
302 RuntimeWarning,
303 stacklevel=4,
304 )
306 if ack_policy is EMPTY:
307 ack_policy = AckPolicy.REJECT_ON_ERROR
309 if stream and kv_watch: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true
310 msg = "You can't use both the `stream` and `kv_watch` options simultaneously."
311 raise SetupError(msg)
313 if stream and obj_watch: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 msg = "You can't use both the `stream` and `obj_watch` options simultaneously."
315 raise SetupError(msg)
317 if kv_watch and obj_watch: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 msg = "You can't use both the `kv_watch` and `obj_watch` options simultaneously."
319 raise SetupError(msg)
321 if pull_sub and not stream: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true
322 msg = "JetStream Pull Subscriber can only be used with the `stream` option."
323 raise SetupError(msg)
325 if not subject and not config:
326 msg = "You must provide either the `subject` or `config` option."
327 raise SetupError(msg)
329 if not stream:
330 if obj_watch or kv_watch:
331 # Obj/Kv Subscriber
332 if pending_msgs_limit is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true
333 warnings.warn(
334 message="The `pending_msgs_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.",
335 category=RuntimeWarning,
336 stacklevel=4,
337 )
339 if pending_bytes_limit is not None: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 warnings.warn(
341 message="The `pending_bytes_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.",
342 category=RuntimeWarning,
343 stacklevel=4,
344 )
346 if queue: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 warnings.warn(
348 message="The `queue` option can be used only with JetStream Push or Core Subscription.",
349 category=RuntimeWarning,
350 stacklevel=4,
351 )
353 if max_workers > 1: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true
354 warnings.warn(
355 message="The `max_workers` option can be used only with JetStream (Pull/Push) or Core Subscription.",
356 category=RuntimeWarning,
357 stacklevel=4,
358 )
360 # Core/Obj/Kv Subscriber
361 if durable: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 warnings.warn(
363 message="The `durable` option can be used only with JetStream (Pull/Push) Subscription.",
364 category=RuntimeWarning,
365 stacklevel=4,
366 )
368 if config is not None: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true
369 warnings.warn(
370 message="The `config` option can be used only with JetStream (Pull/Push) Subscription.",
371 category=RuntimeWarning,
372 stacklevel=4,
373 )
375 if ordered_consumer: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true
376 warnings.warn(
377 message="The `ordered_consumer` option can be used only with JetStream (Pull/Push) Subscription.",
378 category=RuntimeWarning,
379 stacklevel=4,
380 )
382 if idle_heartbeat is not None: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true
383 warnings.warn(
384 message="The `idle_heartbeat` option can be used only with JetStream (Pull/Push) Subscription.",
385 category=RuntimeWarning,
386 stacklevel=4,
387 )
389 if flow_control: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true
390 warnings.warn(
391 message="The `flow_control` option can be used only with JetStream Push Subscription.",
392 category=RuntimeWarning,
393 stacklevel=4,
394 )
396 if deliver_policy: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true
397 warnings.warn(
398 message="The `deliver_policy` option can be used only with JetStream (Pull/Push) Subscription.",
399 category=RuntimeWarning,
400 stacklevel=4,
401 )
403 if headers_only: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true
404 warnings.warn(
405 message="The `headers_only` option can be used only with JetStream (Pull/Push) Subscription.",
406 category=RuntimeWarning,
407 stacklevel=4,
408 )
410 if ack_policy is AckPolicy.ACK_FIRST: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true
411 warnings.warn(
412 message="The `ack_policy=AckPolicy.ACK_FIRST:` option can be used only with JetStream Push Subscription.",
413 category=RuntimeWarning,
414 stacklevel=4,
415 )
417 # JetStream Subscribers
418 elif pull_sub:
419 if queue: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 warnings.warn(
421 message="The `queue` option has no effect with JetStream Pull Subscription. You probably wanted to use the `durable` option instead.",
422 category=RuntimeWarning,
423 stacklevel=4,
424 )
426 if ordered_consumer: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true
427 warnings.warn(
428 "The `ordered_consumer` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
429 RuntimeWarning,
430 stacklevel=4,
431 )
433 if ack_policy is AckPolicy.ACK_FIRST: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true
434 warnings.warn(
435 message="The `ack_policy=AckPolicy.ACK_FIRST` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
436 category=RuntimeWarning,
437 stacklevel=4,
438 )
440 if flow_control: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true
441 warnings.warn(
442 message="The `flow_control` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.",
443 category=RuntimeWarning,
444 stacklevel=4,
445 )
447 # JS PushSub
448 elif durable is not None: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true
449 warnings.warn(
450 message="The JetStream Push consumer with the `durable` option can't be scaled horizontally across multiple instances. You probably wanted to use the `queue` option instead. Also, we strongly recommend using the Jetstream PullSubscriber with the `durable` option as the default.",
451 category=RuntimeWarning,
452 stacklevel=4,
453 )