Coverage for faststream / nats / publisher / producer.py: 94%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from abc import abstractmethod 

3from typing import TYPE_CHECKING, Any, Optional 

4 

5import anyio 

6import nats 

7from nats.aio.client import NO_RESPONDERS_STATUS 

8from nats.js.api import Header 

9from typing_extensions import override 

10 

11from faststream._internal.endpoint.utils import ParserComposition 

12from faststream._internal.producer import ProducerProto 

13from faststream.exceptions import FeatureNotSupportedException 

14from faststream.message import encode_message 

15from faststream.nats.helpers.state import ( 

16 ConnectedState, 

17 ConnectionState, 

18 EmptyConnectionState, 

19) 

20from faststream.nats.parser import NatsParser 

21from faststream.nats.response import NatsPublishCommand 

22 

23if TYPE_CHECKING: 

24 from fast_depends.library.serializer import SerializerProto 

25 from nats.aio.client import Client 

26 from nats.aio.msg import Msg 

27 from nats.js import JetStreamContext 

28 

29 from faststream._internal.types import ( 

30 AsyncCallable, 

31 CustomCallable, 

32 ) 

33 from faststream.nats.schemas import PubAck 

34 

35 

36class NatsFastProducer(ProducerProto[NatsPublishCommand]): 

37 def connect( 

38 self, 

39 connection: Any, 

40 serializer: Optional["SerializerProto"], 

41 ) -> None: ... 

42 

43 def disconnect(self) -> None: ... 

44 

45 @abstractmethod 

46 async def publish(self, cmd: "NatsPublishCommand") -> Optional["PubAck"]: ... 

47 

48 @abstractmethod 

49 async def request(self, cmd: "NatsPublishCommand") -> "Msg": ... 

50 

51 async def publish_batch(self, cmd: "NatsPublishCommand") -> None: 

52 msg = "NATS doesn't support publishing in batches." 

53 raise FeatureNotSupportedException(msg) 

54 

55 

56class NatsFastProducerImpl(NatsFastProducer): 

57 """A class to represent a NATS producer.""" 

58 

59 _decoder: "AsyncCallable" 

60 _parser: "AsyncCallable" 

61 

62 def __init__( 

63 self, 

64 parser: Optional["CustomCallable"], 

65 decoder: Optional["CustomCallable"], 

66 ) -> None: 

67 self.serializer: SerializerProto | None = None 

68 

69 default = NatsParser(pattern="", is_ack_disabled=True) 

70 self._parser = ParserComposition(parser, default.parse_message) 

71 self._decoder = ParserComposition(decoder, default.decode_message) 

72 

73 self.__state: ConnectionState[Client] = EmptyConnectionState() 

74 

75 def connect( 

76 self, 

77 connection: "Client", 

78 serializer: Optional["SerializerProto"], 

79 ) -> None: 

80 self.serializer = serializer 

81 self.__state = ConnectedState(connection) 

82 

83 def disconnect(self) -> None: 

84 self.__state = EmptyConnectionState() 

85 

86 @override 

87 async def publish(self, cmd: "NatsPublishCommand") -> None: 

88 payload, content_type = encode_message(cmd.body, self.serializer) 

89 

90 headers_to_send = { 

91 "content-type": content_type or "", 

92 **cmd.headers_to_publish(), 

93 } 

94 

95 return await self.__state.connection.publish( 

96 subject=cmd.destination, 

97 payload=payload, 

98 reply=cmd.reply_to, 

99 headers=headers_to_send, 

100 ) 

101 

102 @override 

103 async def request(self, cmd: "NatsPublishCommand") -> "Msg": 

104 payload, content_type = encode_message(cmd.body, self.serializer) 

105 

106 headers_to_send = { 

107 "content-type": content_type or "", 

108 **cmd.headers_to_publish(), 

109 } 

110 

111 return await self.__state.connection.request( 

112 subject=cmd.destination, 

113 payload=payload, 

114 headers=headers_to_send, 

115 timeout=cmd.timeout, 

116 ) 

117 

118 

119class NatsJSFastProducer(NatsFastProducer): 

120 """A class to represent a NATS JetStream producer.""" 

121 

122 _decoder: "AsyncCallable" 

123 _parser: "AsyncCallable" 

124 

125 def __init__( 

126 self, 

127 *, 

128 parser: Optional["CustomCallable"], 

129 decoder: Optional["CustomCallable"], 

130 ) -> None: 

131 self.serializer: SerializerProto | None = None 

132 

133 default = NatsParser(pattern="", is_ack_disabled=True) 

134 self._parser = ParserComposition(parser, default.parse_message) 

135 self._decoder = ParserComposition(decoder, default.decode_message) 

136 

137 self.__state: ConnectionState[JetStreamContext] = EmptyConnectionState() 

138 

139 def connect( 

140 self, 

141 connection: "JetStreamContext", 

142 serializer: Optional["SerializerProto"], 

143 ) -> None: 

144 self.serializer = serializer 

145 self.__state = ConnectedState(connection) 

146 

147 def disconnect(self) -> None: 

148 self.__state = EmptyConnectionState() 

149 

150 @override 

151 async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": 

152 payload, content_type = encode_message(cmd.body, self.serializer) 

153 

154 headers_to_send = { 

155 "content-type": content_type or "", 

156 **cmd.headers_to_publish(js=True), 

157 } 

158 

159 return await self.__state.connection.publish( 

160 subject=cmd.destination, 

161 payload=payload, 

162 headers=headers_to_send, 

163 stream=cmd.stream, 

164 timeout=cmd.timeout, 

165 ) 

166 

167 @override 

168 async def request(self, cmd: "NatsPublishCommand") -> "Msg": 

169 payload, content_type = encode_message(cmd.body, self.serializer) 

170 

171 reply_to = self.__state.connection._nc.new_inbox() 

172 future: asyncio.Future[Msg] = asyncio.Future() 

173 sub = await self.__state.connection._nc.subscribe( 

174 reply_to, 

175 future=future, 

176 max_msgs=1, 

177 ) 

178 await sub.unsubscribe(limit=1) 

179 

180 headers_to_send = { 

181 "content-type": content_type or "", 

182 "reply_to": reply_to, 

183 **cmd.headers_to_publish(js=False), 

184 } 

185 

186 with anyio.fail_after(cmd.timeout): 

187 await self.__state.connection.publish( 

188 subject=cmd.destination, 

189 payload=payload, 

190 headers=headers_to_send, 

191 stream=cmd.stream, 

192 timeout=cmd.timeout, 

193 ) 

194 

195 msg = await future 

196 

197 if ( # pragma: no cover 

198 msg.headers and (msg.headers.get(Header.STATUS) == NO_RESPONDERS_STATUS) 

199 ): 

200 raise nats.errors.NoRespondersError 

201 

202 return msg 

203 

204 

205class FakeNatsFastProducer(NatsFastProducer): 

206 def connect(self, connection: Any, serializer: Optional["SerializerProto"]) -> None: 

207 raise NotImplementedError 

208 

209 def disconnect(self) -> None: 

210 raise NotImplementedError 

211 

212 @override 

213 async def publish(self, cmd: "NatsPublishCommand") -> None: 

214 raise NotImplementedError 

215 

216 @override 

217 async def request(self, cmd: "NatsPublishCommand") -> "Msg": 

218 raise NotImplementedError 

219 

220 @override 

221 async def publish_batch(self, cmd: "NatsPublishCommand") -> None: 

222 msg = "NATS doesn't support publishing in batches." 

223 raise FeatureNotSupportedException(msg)