Coverage for faststream / nats / publisher / producer.py: 94%
70 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from abc import abstractmethod
3from typing import TYPE_CHECKING, Any, Optional
5import anyio
6import nats
7from nats.aio.client import NO_RESPONDERS_STATUS
8from nats.js.api import Header
9from typing_extensions import override
11from faststream._internal.endpoint.utils import ParserComposition
12from faststream._internal.producer import ProducerProto
13from faststream.exceptions import FeatureNotSupportedException
14from faststream.message import encode_message
15from faststream.nats.helpers.state import (
16 ConnectedState,
17 ConnectionState,
18 EmptyConnectionState,
19)
20from faststream.nats.parser import NatsParser
21from faststream.nats.response import NatsPublishCommand
23if TYPE_CHECKING:
24 from fast_depends.library.serializer import SerializerProto
25 from nats.aio.client import Client
26 from nats.aio.msg import Msg
27 from nats.js import JetStreamContext
29 from faststream._internal.types import (
30 AsyncCallable,
31 CustomCallable,
32 )
33 from faststream.nats.schemas import PubAck
36class NatsFastProducer(ProducerProto[NatsPublishCommand]):
37 def connect(
38 self,
39 connection: Any,
40 serializer: Optional["SerializerProto"],
41 ) -> None: ...
43 def disconnect(self) -> None: ...
45 @abstractmethod
46 async def publish(self, cmd: "NatsPublishCommand") -> Optional["PubAck"]: ...
48 @abstractmethod
49 async def request(self, cmd: "NatsPublishCommand") -> "Msg": ...
51 async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
52 msg = "NATS doesn't support publishing in batches."
53 raise FeatureNotSupportedException(msg)
56class NatsFastProducerImpl(NatsFastProducer):
57 """A class to represent a NATS producer."""
59 _decoder: "AsyncCallable"
60 _parser: "AsyncCallable"
62 def __init__(
63 self,
64 parser: Optional["CustomCallable"],
65 decoder: Optional["CustomCallable"],
66 ) -> None:
67 self.serializer: SerializerProto | None = None
69 default = NatsParser(pattern="", is_ack_disabled=True)
70 self._parser = ParserComposition(parser, default.parse_message)
71 self._decoder = ParserComposition(decoder, default.decode_message)
73 self.__state: ConnectionState[Client] = EmptyConnectionState()
75 def connect(
76 self,
77 connection: "Client",
78 serializer: Optional["SerializerProto"],
79 ) -> None:
80 self.serializer = serializer
81 self.__state = ConnectedState(connection)
83 def disconnect(self) -> None:
84 self.__state = EmptyConnectionState()
86 @override
87 async def publish(self, cmd: "NatsPublishCommand") -> None:
88 payload, content_type = encode_message(cmd.body, self.serializer)
90 headers_to_send = {
91 "content-type": content_type or "",
92 **cmd.headers_to_publish(),
93 }
95 return await self.__state.connection.publish(
96 subject=cmd.destination,
97 payload=payload,
98 reply=cmd.reply_to,
99 headers=headers_to_send,
100 )
102 @override
103 async def request(self, cmd: "NatsPublishCommand") -> "Msg":
104 payload, content_type = encode_message(cmd.body, self.serializer)
106 headers_to_send = {
107 "content-type": content_type or "",
108 **cmd.headers_to_publish(),
109 }
111 return await self.__state.connection.request(
112 subject=cmd.destination,
113 payload=payload,
114 headers=headers_to_send,
115 timeout=cmd.timeout,
116 )
119class NatsJSFastProducer(NatsFastProducer):
120 """A class to represent a NATS JetStream producer."""
122 _decoder: "AsyncCallable"
123 _parser: "AsyncCallable"
125 def __init__(
126 self,
127 *,
128 parser: Optional["CustomCallable"],
129 decoder: Optional["CustomCallable"],
130 ) -> None:
131 self.serializer: SerializerProto | None = None
133 default = NatsParser(pattern="", is_ack_disabled=True)
134 self._parser = ParserComposition(parser, default.parse_message)
135 self._decoder = ParserComposition(decoder, default.decode_message)
137 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState()
139 def connect(
140 self,
141 connection: "JetStreamContext",
142 serializer: Optional["SerializerProto"],
143 ) -> None:
144 self.serializer = serializer
145 self.__state = ConnectedState(connection)
147 def disconnect(self) -> None:
148 self.__state = EmptyConnectionState()
150 @override
151 async def publish(self, cmd: "NatsPublishCommand") -> "PubAck":
152 payload, content_type = encode_message(cmd.body, self.serializer)
154 headers_to_send = {
155 "content-type": content_type or "",
156 **cmd.headers_to_publish(js=True),
157 }
159 return await self.__state.connection.publish(
160 subject=cmd.destination,
161 payload=payload,
162 headers=headers_to_send,
163 stream=cmd.stream,
164 timeout=cmd.timeout,
165 )
167 @override
168 async def request(self, cmd: "NatsPublishCommand") -> "Msg":
169 payload, content_type = encode_message(cmd.body, self.serializer)
171 reply_to = self.__state.connection._nc.new_inbox()
172 future: asyncio.Future[Msg] = asyncio.Future()
173 sub = await self.__state.connection._nc.subscribe(
174 reply_to,
175 future=future,
176 max_msgs=1,
177 )
178 await sub.unsubscribe(limit=1)
180 headers_to_send = {
181 "content-type": content_type or "",
182 "reply_to": reply_to,
183 **cmd.headers_to_publish(js=False),
184 }
186 with anyio.fail_after(cmd.timeout):
187 await self.__state.connection.publish(
188 subject=cmd.destination,
189 payload=payload,
190 headers=headers_to_send,
191 stream=cmd.stream,
192 timeout=cmd.timeout,
193 )
195 msg = await future
197 if ( # pragma: no cover
198 msg.headers and (msg.headers.get(Header.STATUS) == NO_RESPONDERS_STATUS)
199 ):
200 raise nats.errors.NoRespondersError
202 return msg
205class FakeNatsFastProducer(NatsFastProducer):
206 def connect(self, connection: Any, serializer: Optional["SerializerProto"]) -> None:
207 raise NotImplementedError
209 def disconnect(self) -> None:
210 raise NotImplementedError
212 @override
213 async def publish(self, cmd: "NatsPublishCommand") -> None:
214 raise NotImplementedError
216 @override
217 async def request(self, cmd: "NatsPublishCommand") -> "Msg":
218 raise NotImplementedError
220 @override
221 async def publish_batch(self, cmd: "NatsPublishCommand") -> None:
222 msg = "NATS doesn't support publishing in batches."
223 raise FeatureNotSupportedException(msg)