Coverage for faststream / nats / subscriber / factory.py: 67%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import warnings 

2from typing import TYPE_CHECKING, Any, Optional, TypedDict 

3 

4from nats.aio.subscription import ( 

5 DEFAULT_SUB_PENDING_BYTES_LIMIT, 

6 DEFAULT_SUB_PENDING_MSGS_LIMIT, 

7) 

8from nats.js.api import ConsumerConfig, DeliverPolicy 

9from nats.js.client import ( 

10 DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, 

11 DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, 

12) 

13 

14from faststream._internal.constants import EMPTY 

15from faststream._internal.endpoint.subscriber import SubscriberSpecification 

16from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

17from faststream.exceptions import SetupError 

18from faststream.middlewares import AckPolicy 

19 

20from .config import NatsSubscriberConfig, NatsSubscriberSpecificationConfig 

21from .specification import NatsSubscriberSpecification, NotIncludeSpecifation 

22from .usecases import ( 

23 BatchPullStreamSubscriber, 

24 ConcurrentCoreSubscriber, 

25 ConcurrentPullStreamSubscriber, 

26 ConcurrentPushStreamSubscriber, 

27 CoreSubscriber, 

28 KeyValueWatchSubscriber, 

29 LogicSubscriber, 

30 ObjStoreWatchSubscriber, 

31 PullStreamSubscriber, 

32 PushStreamSubscriber, 

33) 

34 

35if TYPE_CHECKING: 

36 from nats.js import api 

37 

38 from faststream.nats.configs import NatsBrokerConfig 

39 from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub 

40 

41 

42class SharedOptions(TypedDict): 

43 config: NatsSubscriberConfig 

44 specification: SubscriberSpecification[Any, Any] 

45 calls: CallsCollection[Any] 

46 

47 

48def create_subscriber( 

49 *, 

50 subject: str, 

51 queue: str, 

52 pending_msgs_limit: int | None, 

53 pending_bytes_limit: int | None, 

54 # Core args 

55 max_msgs: int, 

56 # JS args 

57 durable: str | None, 

58 config: Optional["api.ConsumerConfig"], 

59 ordered_consumer: bool, 

60 idle_heartbeat: float | None, 

61 flow_control: bool | None, 

62 deliver_policy: Optional["api.DeliverPolicy"], 

63 headers_only: bool | None, 

64 # pull args 

65 pull_sub: Optional["PullSub"], 

66 kv_watch: Optional["KvWatch"], 

67 obj_watch: Optional["ObjWatch"], 

68 inbox_prefix: bytes, 

69 # custom args 

70 max_workers: int, 

71 stream: Optional["JStream"], 

72 # Subscriber args 

73 ack_policy: "AckPolicy", 

74 no_reply: bool, 

75 broker_config: "NatsBrokerConfig", 

76 # Specification information 

77 title_: str | None, 

78 description_: str | None, 

79 include_in_schema: bool, 

80) -> "LogicSubscriber[Any]": 

81 _validate_input_for_misconfigure( 

82 subject=subject, 

83 queue=queue, 

84 pending_msgs_limit=pending_msgs_limit, 

85 pending_bytes_limit=pending_bytes_limit, 

86 max_msgs=max_msgs, 

87 durable=durable, 

88 config=config, 

89 ordered_consumer=ordered_consumer, 

90 idle_heartbeat=idle_heartbeat, 

91 flow_control=flow_control, 

92 deliver_policy=deliver_policy, 

93 headers_only=headers_only, 

94 pull_sub=pull_sub, 

95 ack_policy=ack_policy, 

96 kv_watch=kv_watch, 

97 obj_watch=obj_watch, 

98 max_workers=max_workers, 

99 stream=stream, 

100 ) 

101 

102 config = config or ConsumerConfig(filter_subjects=[]) 

103 if config.durable_name is None: 103 ↛ 105line 103 didn't jump to line 105 because the condition on line 103 was always true

104 config.durable_name = durable 

105 if config.idle_heartbeat is None: 105 ↛ 107line 105 didn't jump to line 107 because the condition on line 105 was always true

106 config.idle_heartbeat = idle_heartbeat 

107 if config.headers_only is None: 107 ↛ 109line 107 didn't jump to line 109 because the condition on line 107 was always true

108 config.headers_only = headers_only 

109 if config.deliver_policy is DeliverPolicy.ALL: 109 ↛ 112line 109 didn't jump to line 112 because the condition on line 109 was always true

110 config.deliver_policy = deliver_policy or DeliverPolicy.ALL 

111 

112 if stream: 

113 # Both JS Subscribers 

114 extra_options: dict[str, Any] = { 

115 "pending_msgs_limit": pending_msgs_limit or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, 

116 "pending_bytes_limit": pending_bytes_limit 

117 or DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, 

118 "durable": durable, 

119 "stream": stream.name, 

120 } 

121 

122 if pull_sub is not None: 

123 # JS Pull Subscriber 

124 extra_options.update({"inbox_prefix": inbox_prefix}) 

125 

126 else: 

127 # JS Push Subscriber 

128 if ack_policy is AckPolicy.ACK_FIRST: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 manual_ack = False 

130 ack_policy = AckPolicy.MANUAL 

131 else: 

132 manual_ack = True 

133 

134 extra_options.update( 

135 { 

136 "ordered_consumer": ordered_consumer, 

137 "idle_heartbeat": idle_heartbeat, 

138 "flow_control": flow_control, 

139 "deliver_policy": deliver_policy, 

140 "headers_only": headers_only, 

141 "manual_ack": manual_ack, 

142 }, 

143 ) 

144 

145 else: 

146 # Core Subscriber 

147 extra_options = { 

148 "pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT, 

149 "pending_bytes_limit": pending_bytes_limit or DEFAULT_SUB_PENDING_BYTES_LIMIT, 

150 "max_msgs": max_msgs, 

151 } 

152 

153 subscriber_config = NatsSubscriberConfig( 

154 subject=subject, 

155 sub_config=config, 

156 extra_options=extra_options, 

157 no_reply=no_reply, 

158 _outer_config=broker_config, 

159 _ack_policy=ack_policy, 

160 ) 

161 

162 calls = CallsCollection[Any]() 

163 

164 specification_config = NatsSubscriberSpecificationConfig( 

165 subject=subject, 

166 queue=queue or None, 

167 title_=title_, 

168 description_=description_, 

169 include_in_schema=include_in_schema, 

170 ) 

171 

172 specification = NatsSubscriberSpecification( 

173 _outer_config=broker_config, 

174 calls=calls, 

175 specification_config=specification_config, 

176 ) 

177 

178 not_include_spec = NotIncludeSpecifation( 

179 _outer_config=broker_config, 

180 calls=calls, 

181 specification_config=specification_config, 

182 ) 

183 

184 subscriber_options: SharedOptions = { 

185 "config": subscriber_config, 

186 "specification": specification, 

187 "calls": calls, 

188 } 

189 

190 if obj_watch is not None: 

191 return ObjStoreWatchSubscriber( 

192 **(subscriber_options | {"specification": not_include_spec}), 

193 obj_watch=obj_watch, 

194 ) 

195 

196 if kv_watch is not None: 

197 return KeyValueWatchSubscriber( 

198 **(subscriber_options | {"specification": not_include_spec}), 

199 kv_watch=kv_watch, 

200 ) 

201 

202 if stream is None: 

203 if max_workers > 1: 

204 return ConcurrentCoreSubscriber( 

205 **subscriber_options, 

206 max_workers=max_workers, 

207 queue=queue, 

208 ) 

209 

210 return CoreSubscriber( 

211 **subscriber_options, 

212 queue=queue, 

213 ) 

214 

215 if max_workers > 1: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 if pull_sub is not None: 

217 return ConcurrentPullStreamSubscriber( 

218 **subscriber_options, 

219 max_workers=max_workers, 

220 queue=queue, 

221 stream=stream, 

222 pull_sub=pull_sub, 

223 ) 

224 

225 return ConcurrentPushStreamSubscriber( 

226 **subscriber_options, 

227 max_workers=max_workers, 

228 queue=queue, 

229 stream=stream, 

230 ) 

231 

232 if pull_sub is not None: 

233 if pull_sub.batch: 

234 return BatchPullStreamSubscriber( 

235 **subscriber_options, 

236 pull_sub=pull_sub, 

237 stream=stream, 

238 ) 

239 

240 return PullStreamSubscriber( 

241 **subscriber_options, 

242 queue=queue, 

243 pull_sub=pull_sub, 

244 stream=stream, 

245 ) 

246 

247 return PushStreamSubscriber( 

248 **subscriber_options, 

249 queue=queue, 

250 stream=stream, 

251 ) 

252 

253 

254def _validate_input_for_misconfigure( # noqa: PLR0915 

255 subject: str, 

256 queue: str, # default "" 

257 pending_msgs_limit: int | None, 

258 pending_bytes_limit: int | None, 

259 max_msgs: int, # default 0 

260 durable: str | None, 

261 config: Optional["api.ConsumerConfig"], 

262 ordered_consumer: bool, # default False 

263 idle_heartbeat: float | None, 

264 flow_control: bool | None, 

265 deliver_policy: Optional["api.DeliverPolicy"], 

266 headers_only: bool | None, 

267 pull_sub: Optional["PullSub"], 

268 kv_watch: Optional["KvWatch"], 

269 obj_watch: Optional["ObjWatch"], 

270 ack_policy: "AckPolicy", # default EMPTY 

271 max_workers: int, # default 1 

272 stream: Optional["JStream"], 

273) -> None: 

274 if ack_policy is not EMPTY: 

275 if obj_watch is not None: 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true

276 warnings.warn( 

277 "You can't use acknowledgement policy with ObjectStorage watch subscriber.", 

278 RuntimeWarning, 

279 stacklevel=4, 

280 ) 

281 

282 elif kv_watch is not None: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 warnings.warn( 

284 "You can't use acknowledgement policy with KeyValue watch subscriber.", 

285 RuntimeWarning, 

286 stacklevel=4, 

287 ) 

288 

289 elif stream is None and ack_policy is not AckPolicy.MANUAL: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 warnings.warn( 

291 ( 

292 "Core subscriber supports only `ack_policy=AckPolicy.MANUAL` option for very specific cases. " 

293 "If you are using different option, probably, you should use JetStream Subscriber instead." 

294 ), 

295 RuntimeWarning, 

296 stacklevel=4, 

297 ) 

298 

299 if max_msgs > 0 and any((stream, kv_watch, obj_watch)): 299 ↛ 300line 299 didn't jump to line 300 because the condition on line 299 was never true

300 warnings.warn( 

301 "The `max_msgs` option can be used only with a NATS Core Subscriber.", 

302 RuntimeWarning, 

303 stacklevel=4, 

304 ) 

305 

306 if ack_policy is EMPTY: 

307 ack_policy = AckPolicy.REJECT_ON_ERROR 

308 

309 if stream and kv_watch: 309 ↛ 310line 309 didn't jump to line 310 because the condition on line 309 was never true

310 msg = "You can't use both the `stream` and `kv_watch` options simultaneously." 

311 raise SetupError(msg) 

312 

313 if stream and obj_watch: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true

314 msg = "You can't use both the `stream` and `obj_watch` options simultaneously." 

315 raise SetupError(msg) 

316 

317 if kv_watch and obj_watch: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true

318 msg = "You can't use both the `kv_watch` and `obj_watch` options simultaneously." 

319 raise SetupError(msg) 

320 

321 if pull_sub and not stream: 321 ↛ 322line 321 didn't jump to line 322 because the condition on line 321 was never true

322 msg = "JetStream Pull Subscriber can only be used with the `stream` option." 

323 raise SetupError(msg) 

324 

325 if not subject and not config: 

326 msg = "You must provide either the `subject` or `config` option." 

327 raise SetupError(msg) 

328 

329 if not stream: 

330 if obj_watch or kv_watch: 

331 # Obj/Kv Subscriber 

332 if pending_msgs_limit is not None: 332 ↛ 333line 332 didn't jump to line 333 because the condition on line 332 was never true

333 warnings.warn( 

334 message="The `pending_msgs_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.", 

335 category=RuntimeWarning, 

336 stacklevel=4, 

337 ) 

338 

339 if pending_bytes_limit is not None: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 warnings.warn( 

341 message="The `pending_bytes_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.", 

342 category=RuntimeWarning, 

343 stacklevel=4, 

344 ) 

345 

346 if queue: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 warnings.warn( 

348 message="The `queue` option can be used only with JetStream Push or Core Subscription.", 

349 category=RuntimeWarning, 

350 stacklevel=4, 

351 ) 

352 

353 if max_workers > 1: 353 ↛ 354line 353 didn't jump to line 354 because the condition on line 353 was never true

354 warnings.warn( 

355 message="The `max_workers` option can be used only with JetStream (Pull/Push) or Core Subscription.", 

356 category=RuntimeWarning, 

357 stacklevel=4, 

358 ) 

359 

360 # Core/Obj/Kv Subscriber 

361 if durable: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 warnings.warn( 

363 message="The `durable` option can be used only with JetStream (Pull/Push) Subscription.", 

364 category=RuntimeWarning, 

365 stacklevel=4, 

366 ) 

367 

368 if config is not None: 368 ↛ 369line 368 didn't jump to line 369 because the condition on line 368 was never true

369 warnings.warn( 

370 message="The `config` option can be used only with JetStream (Pull/Push) Subscription.", 

371 category=RuntimeWarning, 

372 stacklevel=4, 

373 ) 

374 

375 if ordered_consumer: 375 ↛ 376line 375 didn't jump to line 376 because the condition on line 375 was never true

376 warnings.warn( 

377 message="The `ordered_consumer` option can be used only with JetStream (Pull/Push) Subscription.", 

378 category=RuntimeWarning, 

379 stacklevel=4, 

380 ) 

381 

382 if idle_heartbeat is not None: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true

383 warnings.warn( 

384 message="The `idle_heartbeat` option can be used only with JetStream (Pull/Push) Subscription.", 

385 category=RuntimeWarning, 

386 stacklevel=4, 

387 ) 

388 

389 if flow_control: 389 ↛ 390line 389 didn't jump to line 390 because the condition on line 389 was never true

390 warnings.warn( 

391 message="The `flow_control` option can be used only with JetStream Push Subscription.", 

392 category=RuntimeWarning, 

393 stacklevel=4, 

394 ) 

395 

396 if deliver_policy: 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true

397 warnings.warn( 

398 message="The `deliver_policy` option can be used only with JetStream (Pull/Push) Subscription.", 

399 category=RuntimeWarning, 

400 stacklevel=4, 

401 ) 

402 

403 if headers_only: 403 ↛ 404line 403 didn't jump to line 404 because the condition on line 403 was never true

404 warnings.warn( 

405 message="The `headers_only` option can be used only with JetStream (Pull/Push) Subscription.", 

406 category=RuntimeWarning, 

407 stacklevel=4, 

408 ) 

409 

410 if ack_policy is AckPolicy.ACK_FIRST: 410 ↛ 411line 410 didn't jump to line 411 because the condition on line 410 was never true

411 warnings.warn( 

412 message="The `ack_policy=AckPolicy.ACK_FIRST:` option can be used only with JetStream Push Subscription.", 

413 category=RuntimeWarning, 

414 stacklevel=4, 

415 ) 

416 

417 # JetStream Subscribers 

418 elif pull_sub: 

419 if queue: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 warnings.warn( 

421 message="The `queue` option has no effect with JetStream Pull Subscription. You probably wanted to use the `durable` option instead.", 

422 category=RuntimeWarning, 

423 stacklevel=4, 

424 ) 

425 

426 if ordered_consumer: 426 ↛ 427line 426 didn't jump to line 427 because the condition on line 426 was never true

427 warnings.warn( 

428 "The `ordered_consumer` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", 

429 RuntimeWarning, 

430 stacklevel=4, 

431 ) 

432 

433 if ack_policy is AckPolicy.ACK_FIRST: 433 ↛ 434line 433 didn't jump to line 434 because the condition on line 433 was never true

434 warnings.warn( 

435 message="The `ack_policy=AckPolicy.ACK_FIRST` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", 

436 category=RuntimeWarning, 

437 stacklevel=4, 

438 ) 

439 

440 if flow_control: 440 ↛ 441line 440 didn't jump to line 441 because the condition on line 440 was never true

441 warnings.warn( 

442 message="The `flow_control` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", 

443 category=RuntimeWarning, 

444 stacklevel=4, 

445 ) 

446 

447 # JS PushSub 

448 elif durable is not None: 448 ↛ 449line 448 didn't jump to line 449 because the condition on line 448 was never true

449 warnings.warn( 

450 message="The JetStream Push consumer with the `durable` option can't be scaled horizontally across multiple instances. You probably wanted to use the `queue` option instead. Also, we strongly recommend using the Jetstream PullSubscriber with the `durable` option as the default.", 

451 category=RuntimeWarning, 

452 stacklevel=4, 

453 )