Coverage for faststream / nats / publisher / usecase.py: 83%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable 

2from typing import TYPE_CHECKING, Any, Optional, Union, cast 

3 

4from typing_extensions import overload, override 

5 

6from faststream._internal.endpoint.publisher import PublisherUsecase 

7from faststream.message import gen_cor_id 

8from faststream.nats.response import NatsPublishCommand 

9from faststream.nats.schemas.js_stream import compile_nats_wildcard 

10from faststream.response.publish_type import PublishType 

11 

12if TYPE_CHECKING: 

13 from faststream._internal.basic_types import SendableMessage 

14 from faststream._internal.endpoint.publisher import PublisherSpecification 

15 from faststream._internal.producer import ProducerProto 

16 from faststream._internal.types import PublisherMiddleware 

17 from faststream.nats.configs import NatsBrokerConfig 

18 from faststream.nats.message import NatsMessage 

19 from faststream.nats.schemas import PubAck 

20 from faststream.response.response import PublishCommand 

21 

22 from .config import NatsPublisherConfig 

23 

24 

25class LogicPublisher(PublisherUsecase): 

26 """A class to represent a NATS publisher.""" 

27 

28 _outer_config: "NatsBrokerConfig" 

29 

30 def __init__( 

31 self, 

32 config: "NatsPublisherConfig", 

33 specification: "PublisherSpecification[Any, Any]", 

34 ) -> None: 

35 """Initialize NATS publisher object.""" 

36 super().__init__(config, specification) 

37 

38 self._subject = config.subject 

39 self.stream = config.stream 

40 self.timeout = config.timeout or 0.5 

41 self.headers = config.headers or {} 

42 self.reply_to = config.reply_to 

43 

44 @property 

45 def clear_subject(self) -> str: 

46 """Compile `test.{name}` to `test.*` subject.""" 

47 _, path = compile_nats_wildcard(self.subject) 

48 return path 

49 

50 @property 

51 def subject(self) -> str: 

52 return f"{self._outer_config.prefix}{self._subject}" 

53 

54 @overload 

55 async def publish( 

56 self, 

57 message: "SendableMessage", 

58 subject: str = "", 

59 headers: dict[str, str] | None = None, 

60 reply_to: str = "", 

61 correlation_id: str | None = None, 

62 stream: None = None, 

63 timeout: float | None = None, 

64 ) -> None: ... 

65 

66 @overload 

67 async def publish( 

68 self, 

69 message: "SendableMessage", 

70 subject: str = "", 

71 headers: dict[str, str] | None = None, 

72 reply_to: str = "", 

73 correlation_id: str | None = None, 

74 stream: str | None = None, 

75 timeout: float | None = None, 

76 ) -> "PubAck": ... 

77 

78 @override 

79 async def publish( 

80 self, 

81 message: "SendableMessage", 

82 subject: str = "", 

83 headers: dict[str, str] | None = None, 

84 reply_to: str = "", 

85 correlation_id: str | None = None, 

86 stream: str | None = None, 

87 timeout: float | None = None, 

88 ) -> Optional["PubAck"]: 

89 """Publish message directly. 

90 

91 Args: 

92 message: 

93 Message body to send. 

94 Can be any encodable object (native python types or `pydantic.BaseModel`). 

95 subject: 

96 NATS subject to send message. 

97 headers: 

98 Message headers to store metainformation. 

99 **content-type** and **correlation_id** will be set automatically by framework anyway. 

100 reply_to: 

101 NATS subject name to send response. 

102 correlation_id: 

103 Manual message **correlation_id** setter. 

104 **correlation_id** is a useful option to trace messages. 

105 stream: 

106 This option validates that the target subject is in presented stream. 

107 Can be omitted without any effect if you doesn't want PubAck frame. 

108 timeout: 

109 Timeout to send message to NATS. 

110 

111 Returns: 

112 `None` if you publishes a regular message. 

113 `faststream.nats.PubAck` if you publishes a message to stream. 

114 """ 

115 cmd = NatsPublishCommand( 

116 message, 

117 subject=subject or self.subject, 

118 headers=self.headers | (headers or {}), 

119 reply_to=reply_to or self.reply_to, 

120 correlation_id=correlation_id or gen_cor_id(), 

121 stream=stream or getattr(self.stream, "name", None), 

122 timeout=timeout or self.timeout, 

123 _publish_type=PublishType.PUBLISH, 

124 ) 

125 

126 response: PubAck | None 

127 if cmd.stream: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true

128 response = cast( 

129 "PubAck", 

130 await self._basic_publish( 

131 cmd, 

132 producer=self._outer_config.js_producer, 

133 _extra_middlewares=(), 

134 ), 

135 ) 

136 else: 

137 response = await self._basic_publish( 

138 cmd, 

139 producer=self._outer_config.producer, 

140 _extra_middlewares=(), 

141 ) 

142 

143 return response 

144 

145 @override 

146 async def _publish( 

147 self, 

148 cmd: Union["PublishCommand", "NatsPublishCommand"], 

149 *, 

150 _extra_middlewares: Iterable["PublisherMiddleware"], 

151 ) -> None: 

152 """This method should be called in subscriber flow only.""" 

153 cmd = NatsPublishCommand.from_cmd(cmd) 

154 

155 cmd.destination = self.subject 

156 cmd.add_headers(self.headers, override=False) 

157 cmd.reply_to = cmd.reply_to or self.reply_to 

158 

159 if self.stream: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true

160 cmd.stream = self.stream.name 

161 cmd.timeout = self.timeout 

162 

163 if cmd.stream: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 producer: ProducerProto[Any] = self._outer_config.js_producer 

165 else: 

166 producer = self._outer_config.producer 

167 

168 await self._basic_publish( 

169 cmd, 

170 producer=producer, 

171 _extra_middlewares=_extra_middlewares, 

172 ) 

173 

174 @override 

175 async def request( 

176 self, 

177 message: "SendableMessage", 

178 subject: str = "", 

179 headers: dict[str, str] | None = None, 

180 correlation_id: str | None = None, 

181 stream: str | None = None, 

182 timeout: float = 0.5, 

183 ) -> "NatsMessage": 

184 """Make a synchronous request to outer subscriber. 

185 

186 If out subscriber listens subject by stream, you should setup the same **stream** explicitly. 

187 Another way you will reseave confirmation frame as a response. 

188 

189 Args: 

190 message: 

191 Message body to send. 

192 Can be any encodable object (native python types or `pydantic.BaseModel`). 

193 subject: 

194 NATS subject to send message. 

195 headers: 

196 Message headers to store metainformation. 

197 **content-type** and **correlation_id** will be set automatically by framework anyway. 

198 correlation_id: 

199 Manual message **correlation_id** setter. 

200 **correlation_id** is a useful option to trace messages. 

201 stream: 

202 This allows to make RPC calls over JetStream subjects. 

203 timeout: 

204 Timeout to send message to NATS. 

205 

206 Returns: 

207 `faststream.nats.message.NatsMessage` object as an outer subscriber response. 

208 """ 

209 cmd = NatsPublishCommand( 

210 message=message, 

211 subject=subject or self.subject, 

212 headers=self.headers | (headers or {}), 

213 timeout=timeout or self.timeout, 

214 correlation_id=correlation_id or gen_cor_id(), 

215 stream=stream or getattr(self.stream, "name", None), 

216 _publish_type=PublishType.REQUEST, 

217 ) 

218 

219 if cmd.stream: 

220 producer: ProducerProto[Any] = self._outer_config.js_producer 

221 else: 

222 producer = self._outer_config.producer 

223 

224 msg: NatsMessage = await self._basic_request(cmd, producer=producer) 

225 return msg