Coverage for faststream / nats / publisher / usecase.py: 83%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable
2from typing import TYPE_CHECKING, Any, Optional, Union, cast
4from typing_extensions import overload, override
6from faststream._internal.endpoint.publisher import PublisherUsecase
7from faststream.message import gen_cor_id
8from faststream.nats.response import NatsPublishCommand
9from faststream.nats.schemas.js_stream import compile_nats_wildcard
10from faststream.response.publish_type import PublishType
12if TYPE_CHECKING:
13 from faststream._internal.basic_types import SendableMessage
14 from faststream._internal.endpoint.publisher import PublisherSpecification
15 from faststream._internal.producer import ProducerProto
16 from faststream._internal.types import PublisherMiddleware
17 from faststream.nats.configs import NatsBrokerConfig
18 from faststream.nats.message import NatsMessage
19 from faststream.nats.schemas import PubAck
20 from faststream.response.response import PublishCommand
22 from .config import NatsPublisherConfig
25class LogicPublisher(PublisherUsecase):
26 """A class to represent a NATS publisher."""
28 _outer_config: "NatsBrokerConfig"
30 def __init__(
31 self,
32 config: "NatsPublisherConfig",
33 specification: "PublisherSpecification[Any, Any]",
34 ) -> None:
35 """Initialize NATS publisher object."""
36 super().__init__(config, specification)
38 self._subject = config.subject
39 self.stream = config.stream
40 self.timeout = config.timeout or 0.5
41 self.headers = config.headers or {}
42 self.reply_to = config.reply_to
44 @property
45 def clear_subject(self) -> str:
46 """Compile `test.{name}` to `test.*` subject."""
47 _, path = compile_nats_wildcard(self.subject)
48 return path
50 @property
51 def subject(self) -> str:
52 return f"{self._outer_config.prefix}{self._subject}"
54 @overload
55 async def publish(
56 self,
57 message: "SendableMessage",
58 subject: str = "",
59 headers: dict[str, str] | None = None,
60 reply_to: str = "",
61 correlation_id: str | None = None,
62 stream: None = None,
63 timeout: float | None = None,
64 ) -> None: ...
66 @overload
67 async def publish(
68 self,
69 message: "SendableMessage",
70 subject: str = "",
71 headers: dict[str, str] | None = None,
72 reply_to: str = "",
73 correlation_id: str | None = None,
74 stream: str | None = None,
75 timeout: float | None = None,
76 ) -> "PubAck": ...
78 @override
79 async def publish(
80 self,
81 message: "SendableMessage",
82 subject: str = "",
83 headers: dict[str, str] | None = None,
84 reply_to: str = "",
85 correlation_id: str | None = None,
86 stream: str | None = None,
87 timeout: float | None = None,
88 ) -> Optional["PubAck"]:
89 """Publish message directly.
91 Args:
92 message:
93 Message body to send.
94 Can be any encodable object (native python types or `pydantic.BaseModel`).
95 subject:
96 NATS subject to send message.
97 headers:
98 Message headers to store metainformation.
99 **content-type** and **correlation_id** will be set automatically by framework anyway.
100 reply_to:
101 NATS subject name to send response.
102 correlation_id:
103 Manual message **correlation_id** setter.
104 **correlation_id** is a useful option to trace messages.
105 stream:
106 This option validates that the target subject is in presented stream.
107 Can be omitted without any effect if you doesn't want PubAck frame.
108 timeout:
109 Timeout to send message to NATS.
111 Returns:
112 `None` if you publishes a regular message.
113 `faststream.nats.PubAck` if you publishes a message to stream.
114 """
115 cmd = NatsPublishCommand(
116 message,
117 subject=subject or self.subject,
118 headers=self.headers | (headers or {}),
119 reply_to=reply_to or self.reply_to,
120 correlation_id=correlation_id or gen_cor_id(),
121 stream=stream or getattr(self.stream, "name", None),
122 timeout=timeout or self.timeout,
123 _publish_type=PublishType.PUBLISH,
124 )
126 response: PubAck | None
127 if cmd.stream: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true
128 response = cast(
129 "PubAck",
130 await self._basic_publish(
131 cmd,
132 producer=self._outer_config.js_producer,
133 _extra_middlewares=(),
134 ),
135 )
136 else:
137 response = await self._basic_publish(
138 cmd,
139 producer=self._outer_config.producer,
140 _extra_middlewares=(),
141 )
143 return response
145 @override
146 async def _publish(
147 self,
148 cmd: Union["PublishCommand", "NatsPublishCommand"],
149 *,
150 _extra_middlewares: Iterable["PublisherMiddleware"],
151 ) -> None:
152 """This method should be called in subscriber flow only."""
153 cmd = NatsPublishCommand.from_cmd(cmd)
155 cmd.destination = self.subject
156 cmd.add_headers(self.headers, override=False)
157 cmd.reply_to = cmd.reply_to or self.reply_to
159 if self.stream: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 cmd.stream = self.stream.name
161 cmd.timeout = self.timeout
163 if cmd.stream: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 producer: ProducerProto[Any] = self._outer_config.js_producer
165 else:
166 producer = self._outer_config.producer
168 await self._basic_publish(
169 cmd,
170 producer=producer,
171 _extra_middlewares=_extra_middlewares,
172 )
174 @override
175 async def request(
176 self,
177 message: "SendableMessage",
178 subject: str = "",
179 headers: dict[str, str] | None = None,
180 correlation_id: str | None = None,
181 stream: str | None = None,
182 timeout: float = 0.5,
183 ) -> "NatsMessage":
184 """Make a synchronous request to outer subscriber.
186 If out subscriber listens subject by stream, you should setup the same **stream** explicitly.
187 Another way you will reseave confirmation frame as a response.
189 Args:
190 message:
191 Message body to send.
192 Can be any encodable object (native python types or `pydantic.BaseModel`).
193 subject:
194 NATS subject to send message.
195 headers:
196 Message headers to store metainformation.
197 **content-type** and **correlation_id** will be set automatically by framework anyway.
198 correlation_id:
199 Manual message **correlation_id** setter.
200 **correlation_id** is a useful option to trace messages.
201 stream:
202 This allows to make RPC calls over JetStream subjects.
203 timeout:
204 Timeout to send message to NATS.
206 Returns:
207 `faststream.nats.message.NatsMessage` object as an outer subscriber response.
208 """
209 cmd = NatsPublishCommand(
210 message=message,
211 subject=subject or self.subject,
212 headers=self.headers | (headers or {}),
213 timeout=timeout or self.timeout,
214 correlation_id=correlation_id or gen_cor_id(),
215 stream=stream or getattr(self.stream, "name", None),
216 _publish_type=PublishType.REQUEST,
217 )
219 if cmd.stream:
220 producer: ProducerProto[Any] = self._outer_config.js_producer
221 else:
222 producer = self._outer_config.producer
224 msg: NatsMessage = await self._basic_request(cmd, producer=producer)
225 return msg