Coverage for faststream / nats / subscriber / usecases / stream_basic.py: 96%
41 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import AsyncIterator
2from typing import TYPE_CHECKING, Any, Optional
4from nats.errors import ConnectionClosedError, TimeoutError
5from typing_extensions import override
7from faststream._internal.endpoint.utils import process_msg
8from faststream.nats.parser import JsParser
10from .basic import DefaultSubscriber
12if TYPE_CHECKING:
13 from nats.aio.msg import Msg
14 from nats.js import JetStreamContext
16 from faststream._internal.endpoint.subscriber import SubscriberSpecification
17 from faststream._internal.endpoint.subscriber.call_item import CallsCollection
18 from faststream.message import StreamMessage
19 from faststream.nats.message import NatsMessage
20 from faststream.nats.schemas import JStream
21 from faststream.nats.subscriber.config import NatsSubscriberConfig
24class StreamSubscriber(DefaultSubscriber["Msg"]):
25 _fetch_sub: Optional["JetStreamContext.PullSubscription"]
27 def __init__(
28 self,
29 config: "NatsSubscriberConfig",
30 specification: "SubscriberSpecification[Any, Any]",
31 calls: "CallsCollection[Msg]",
32 *,
33 stream: "JStream",
34 queue: str,
35 ) -> None:
36 parser = JsParser(pattern=config.subject)
37 config.decoder = parser.decode_message
38 config.parser = parser.parse_message
39 super().__init__(config, specification, calls)
41 self.queue = queue
42 self.stream = stream
44 def get_log_context(
45 self,
46 message: Optional["StreamMessage[Msg]"],
47 ) -> dict[str, str]:
48 """Log context factory using in `self.consume` scope.
50 Args:
51 message: Message which we are building context for
52 """
53 return self.build_log_context(
54 message=message,
55 subject=self._resolved_subject_string,
56 queue=self.queue,
57 stream=self.stream.name,
58 )
60 @override
61 async def get_one(self, *, timeout: float = 5) -> Optional["NatsMessage"]:
62 assert not self.calls, (
63 "You can't use `get_one` method if subscriber has registered handlers."
64 )
66 if not self._fetch_sub: 66 ↛ 82line 66 didn't jump to line 82 because the condition on line 66 was always true
67 extra_options = {
68 "pending_bytes_limit": self.extra_options["pending_bytes_limit"],
69 "pending_msgs_limit": self.extra_options["pending_msgs_limit"],
70 "durable": self.extra_options["durable"],
71 "stream": self.extra_options["stream"],
72 }
73 if inbox_prefix := self.extra_options.get("inbox_prefix"):
74 extra_options["inbox_prefix"] = inbox_prefix
76 self._fetch_sub = await self.jetstream.pull_subscribe(
77 subject=self.clear_subject,
78 config=self.config,
79 **extra_options,
80 )
82 try:
83 raw_message = (
84 await self._fetch_sub.fetch(
85 batch=1,
86 timeout=timeout,
87 )
88 )[0]
89 except (TimeoutError, ConnectionClosedError):
90 return None
92 context = self._outer_config.fd_config.context
93 async_parser, async_decoder = self._get_parser_and_decoder()
95 msg: NatsMessage = await process_msg( # type: ignore[assignment]
96 msg=raw_message,
97 middlewares=(
98 m(raw_message, context=context) for m in self._broker_middlewares
99 ),
100 parser=async_parser,
101 decoder=async_decoder,
102 )
103 return msg
105 @override
106 async def __aiter__(self) -> AsyncIterator["NatsMessage"]: # type: ignore[override]
107 assert not self.calls, (
108 "You can't use iterator if subscriber has registered handlers."
109 )
111 if not self._fetch_sub: 111 ↛ 127line 111 didn't jump to line 127 because the condition on line 111 was always true
112 extra_options = {
113 "pending_bytes_limit": self.extra_options["pending_bytes_limit"],
114 "pending_msgs_limit": self.extra_options["pending_msgs_limit"],
115 "durable": self.extra_options["durable"],
116 "stream": self.extra_options["stream"],
117 }
118 if inbox_prefix := self.extra_options.get("inbox_prefix"):
119 extra_options["inbox_prefix"] = inbox_prefix
121 self._fetch_sub = await self.jetstream.pull_subscribe(
122 subject=self.clear_subject,
123 config=self.config,
124 **extra_options,
125 )
127 context = self._outer_config.fd_config.context
128 async_parser, async_decoder = self._get_parser_and_decoder()
130 while True:
131 raw_message = (
132 await self._fetch_sub.fetch(
133 batch=1,
134 timeout=None,
135 )
136 )[0]
138 msg: NatsMessage = await process_msg( # type: ignore[assignment]
139 msg=raw_message,
140 middlewares=(
141 m(raw_message, context=context) for m in self._broker_middlewares
142 ),
143 parser=async_parser,
144 decoder=async_decoder,
145 )
146 yield msg