Coverage for faststream / nats / response.py: 92%
29 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Optional, Union
3from typing_extensions import override
5from faststream.response.publish_type import PublishType
6from faststream.response.response import PublishCommand, Response
8if TYPE_CHECKING:
9 from faststream._internal.basic_types import SendableMessage
10 from faststream.nats.schemas.schedule import Schedule
13class NatsResponse(Response):
14 def __init__(
15 self,
16 body: "SendableMessage",
17 *,
18 headers: dict[str, str] | None = None,
19 correlation_id: str | None = None,
20 stream: str | None = None,
21 schedule: Optional["Schedule"] = None,
22 ) -> None:
23 super().__init__(
24 body=body,
25 headers=headers,
26 correlation_id=correlation_id,
27 )
28 self.stream = stream
29 self.schedule = schedule
31 @override
32 def as_publish_command(self) -> "NatsPublishCommand":
33 return NatsPublishCommand(
34 message=self.body,
35 headers=self.headers,
36 correlation_id=self.correlation_id,
37 _publish_type=PublishType.PUBLISH,
38 # Nats specific
39 subject="",
40 stream=self.stream,
41 schedule=self.schedule,
42 )
45class NatsPublishCommand(PublishCommand):
46 def __init__(
47 self,
48 message: "SendableMessage",
49 *,
50 subject: str = "",
51 correlation_id: str | None = None,
52 headers: dict[str, str] | None = None,
53 reply_to: str = "",
54 stream: str | None = None,
55 timeout: float = 0.5,
56 schedule: Optional["Schedule"] = None,
57 _publish_type: PublishType,
58 ) -> None:
59 super().__init__(
60 body=message,
61 destination=subject,
62 correlation_id=correlation_id,
63 headers=headers,
64 reply_to=reply_to,
65 _publish_type=_publish_type,
66 )
68 self.stream = stream
69 self.timeout = timeout
70 self.schedule = schedule
72 def headers_to_publish(self, *, js: bool = False) -> dict[str, str]:
73 headers = {}
75 if self.correlation_id: 75 ↛ 78line 75 didn't jump to line 78 because the condition on line 75 was always true
76 headers["correlation_id"] = self.correlation_id
78 if js and self.reply_to: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true
79 headers["reply_to"] = self.reply_to
81 if self.schedule:
82 headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}"
83 headers["Nats-Schedule-Target"] = self.schedule.target
85 return headers | self.headers
87 @classmethod
88 def from_cmd(
89 cls,
90 cmd: Union["PublishCommand", "NatsPublishCommand"],
91 ) -> "NatsPublishCommand":
92 if isinstance(cmd, NatsPublishCommand):
93 # NOTE: Should return a copy probably.
94 return cmd
96 return cls(
97 message=cmd.body,
98 subject=cmd.destination,
99 correlation_id=cmd.correlation_id,
100 headers=cmd.headers,
101 reply_to=cmd.reply_to,
102 _publish_type=cmd.publish_type,
103 )
105 def __repr__(self) -> str:
106 body = [f"body='{self.body}'", f"subject='{self.destination}'"]
107 if self.stream:
108 body.append(f"stream={self.stream}")
109 if self.reply_to:
110 body.append(f"reply_to='{self.reply_to}'")
111 body.extend((
112 f"headers={self.headers}",
113 f"correlation_id='{self.correlation_id}'",
114 f"publish_type={self.publish_type}",
115 ))
116 return f"{self.__class__.__name__}({', '.join(body)})"