Coverage for faststream / nats / subscriber / usecases / stream_push_subscriber.py: 58%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Optional
3from nats.aio.msg import Msg
4from typing_extensions import override
6from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin
8from .stream_basic import StreamSubscriber
10if TYPE_CHECKING:
11 from nats.js import JetStreamContext
14class PushStreamSubscriber(StreamSubscriber):
15 subscription: Optional["JetStreamContext.PushSubscription"]
17 @override
18 async def _create_subscription(self) -> None:
19 """Create NATS subscription and start consume task."""
20 if self.subscription: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true
21 return
23 self.subscription = await self.jetstream.subscribe(
24 subject=self.clear_subject,
25 queue=self.queue,
26 cb=self.consume,
27 config=self.config,
28 **self.extra_options,
29 )
32class ConcurrentPushStreamSubscriber(ConcurrentMixin[Msg], StreamSubscriber):
33 subscription: Optional["JetStreamContext.PushSubscription"]
35 @override
36 async def _create_subscription(self) -> None:
37 """Create NATS subscription and start consume task."""
38 if self.subscription:
39 return
41 self.start_consume_task()
43 self.subscription = await self.jetstream.subscribe(
44 subject=self.clear_subject,
45 queue=self.queue,
46 cb=self._put_msg,
47 config=self.config,
48 **self.extra_options,
49 )