Coverage for faststream / nats / subscriber / usecases / core_subscriber.py: 84%
47 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import AsyncIterator
2from typing import TYPE_CHECKING, Any, Optional
4from nats.errors import TimeoutError
5from typing_extensions import override
7from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin
8from faststream._internal.endpoint.utils import process_msg
9from faststream.nats.parser import NatsParser
11from .basic import DefaultSubscriber
13if TYPE_CHECKING:
14 from nats.aio.msg import Msg
15 from nats.aio.subscription import Subscription
17 from faststream._internal.endpoint.subscriber import SubscriberSpecification
18 from faststream._internal.endpoint.subscriber.call_item import CallsCollection
19 from faststream.message import StreamMessage
20 from faststream.nats.message import NatsMessage
21 from faststream.nats.subscriber.config import NatsSubscriberConfig
24class CoreSubscriber(DefaultSubscriber["Msg"]):
25 subscription: Optional["Subscription"]
26 _fetch_sub: Optional["Subscription"]
28 def __init__(
29 self,
30 config: "NatsSubscriberConfig",
31 specification: "SubscriberSpecification[Any, Any]",
32 calls: "CallsCollection[Msg]",
33 *,
34 queue: str,
35 ) -> None:
36 parser = NatsParser(
37 pattern=config.subject,
38 is_ack_disabled=True, # core subscriber has no ack policy
39 )
40 config.parser = parser.parse_message
41 config.decoder = parser.decode_message
42 super().__init__(config, specification, calls)
44 self.queue = queue
46 @override
47 async def get_one(
48 self,
49 *,
50 timeout: float = 5.0,
51 ) -> "NatsMessage | None":
52 assert not self.calls, (
53 "You can't use `get_one` method if subscriber has registered handlers."
54 )
56 if self._fetch_sub is None: 56 ↛ 63line 56 didn't jump to line 63 because the condition on line 56 was always true
57 fetch_sub = self._fetch_sub = await self.connection.subscribe(
58 subject=self.clear_subject,
59 queue=self.queue,
60 **self.extra_options,
61 )
62 else:
63 fetch_sub = self._fetch_sub
65 try:
66 raw_message = await fetch_sub.next_msg(timeout=timeout)
67 except TimeoutError:
68 return None
70 context = self._outer_config.fd_config.context
72 async_parser, async_decoder = self._get_parser_and_decoder()
74 msg: NatsMessage = await process_msg( # type: ignore[assignment]
75 msg=raw_message,
76 middlewares=(
77 m(raw_message, context=context) for m in self._broker_middlewares
78 ),
79 parser=async_parser,
80 decoder=async_decoder,
81 )
82 return msg
84 @override
85 async def __aiter__(self) -> AsyncIterator["NatsMessage"]: # type: ignore[override]
86 assert not self.calls, (
87 "You can't use iterator if subscriber has registered handlers."
88 )
90 if self._fetch_sub is None: 90 ↛ 97line 90 didn't jump to line 97 because the condition on line 90 was always true
91 fetch_sub = self._fetch_sub = await self.connection.subscribe(
92 subject=self.clear_subject,
93 queue=self.queue,
94 **self.extra_options,
95 )
96 else:
97 fetch_sub = self._fetch_sub
99 context = self._outer_config.fd_config.context
100 async_parser, async_decoder = self._get_parser_and_decoder()
102 async for raw_message in fetch_sub.messages: 102 ↛ exitline 102 didn't return from function '__aiter__' because the loop on line 102 didn't complete
103 msg: NatsMessage = await process_msg( # type: ignore[assignment]
104 msg=raw_message,
105 middlewares=(
106 m(raw_message, context=context) for m in self._broker_middlewares
107 ),
108 parser=async_parser,
109 decoder=async_decoder,
110 )
111 yield msg
113 async def _create_subscription(self) -> None:
114 """Create NATS subscription and start consume task."""
115 if self.subscription: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 return
118 self.subscription = await self.connection.subscribe(
119 subject=self.clear_subject,
120 queue=self.queue,
121 cb=self.consume,
122 **self.extra_options,
123 )
125 def get_log_context(
126 self,
127 message: Optional["StreamMessage[Msg]"],
128 ) -> dict[str, str]:
129 """Log context factory using in `self.consume` scope.
131 Args:
132 message: Message which we are building context for
133 """
134 return self.build_log_context(
135 message=message,
136 subject=self.subject,
137 queue=self.queue,
138 )
141class ConcurrentCoreSubscriber(ConcurrentMixin["Msg"], CoreSubscriber):
142 @override
143 async def _create_subscription(self) -> None:
144 """Create NATS subscription and start consume task."""
145 if self.subscription: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return
148 self.start_consume_task()
150 self.subscription = await self.connection.subscribe(
151 subject=self.clear_subject,
152 queue=self.queue,
153 cb=self._put_msg,
154 **self.extra_options,
155 )