Coverage for faststream / nats / subscriber / usecases / stream_push_subscriber.py: 58%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Optional 

2 

3from nats.aio.msg import Msg 

4from typing_extensions import override 

5 

6from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin 

7 

8from .stream_basic import StreamSubscriber 

9 

10if TYPE_CHECKING: 

11 from nats.js import JetStreamContext 

12 

13 

14class PushStreamSubscriber(StreamSubscriber): 

15 subscription: Optional["JetStreamContext.PushSubscription"] 

16 

17 @override 

18 async def _create_subscription(self) -> None: 

19 """Create NATS subscription and start consume task.""" 

20 if self.subscription: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true

21 return 

22 

23 self.subscription = await self.jetstream.subscribe( 

24 subject=self.clear_subject, 

25 queue=self.queue, 

26 cb=self.consume, 

27 config=self.config, 

28 **self.extra_options, 

29 ) 

30 

31 

32class ConcurrentPushStreamSubscriber(ConcurrentMixin[Msg], StreamSubscriber): 

33 subscription: Optional["JetStreamContext.PushSubscription"] 

34 

35 @override 

36 async def _create_subscription(self) -> None: 

37 """Create NATS subscription and start consume task.""" 

38 if self.subscription: 

39 return 

40 

41 self.start_consume_task() 

42 

43 self.subscription = await self.jetstream.subscribe( 

44 subject=self.clear_subject, 

45 queue=self.queue, 

46 cb=self._put_msg, 

47 config=self.config, 

48 **self.extra_options, 

49 )