Coverage for tests / brokers / nats / test_test_client.py: 97%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2 

3import pytest 

4 

5from faststream import BaseMiddleware 

6from faststream.nats import ( 

7 ConsumerConfig, 

8 JStream, 

9 PullSub, 

10) 

11from faststream.nats.testing import FakeProducer 

12from tests.brokers.base.testclient import BrokerTestclientTestcase 

13 

14from .basic import NatsMemoryTestcaseConfig 

15 

16 

17@pytest.mark.nats() 

18@pytest.mark.asyncio() 

19class TestTestclient(NatsMemoryTestcaseConfig, BrokerTestclientTestcase): 

20 @pytest.mark.asyncio() 

21 async def test_stream_publish( 

22 self, 

23 queue: str, 

24 ) -> None: 

25 pub_broker = self.get_broker(apply_types=False) 

26 

27 @pub_broker.subscriber(queue, stream="test") 

28 async def m(msg) -> None: ... 

29 

30 async with self.patch_broker(pub_broker) as br: 

31 await br.publish("Hi!", queue, stream="test") 

32 m.mock.assert_called_once_with("Hi!") 

33 

34 @pytest.mark.asyncio() 

35 async def test_wrong_stream_publish( 

36 self, 

37 queue: str, 

38 ) -> None: 

39 pub_broker = self.get_broker(apply_types=False) 

40 

41 @pub_broker.subscriber(queue) 

42 async def m(msg) -> None: ... 

43 

44 async with self.patch_broker(pub_broker) as br: 

45 await br.publish("Hi!", queue, stream="test") 

46 assert not m.mock.called 

47 

48 @pytest.mark.connected() 

49 async def test_with_real_testclient( 

50 self, 

51 queue: str, 

52 ) -> None: 

53 event = asyncio.Event() 

54 

55 broker = self.get_broker() 

56 

57 @broker.subscriber(queue) 

58 def subscriber(m) -> None: 

59 event.set() 

60 

61 async with self.patch_broker(broker, with_real=True) as br: 

62 await asyncio.wait( 

63 ( 

64 asyncio.create_task(br.publish("hello", queue)), 

65 asyncio.create_task(event.wait()), 

66 ), 

67 timeout=3, 

68 ) 

69 

70 assert event.is_set() 

71 

72 @pytest.mark.connected() 

73 async def test_inbox_prefix_with_real( 

74 self, 

75 queue: str, 

76 ) -> None: 

77 broker = self.get_broker(inbox_prefix="test") 

78 

79 async with self.patch_broker(broker, with_real=True) as br: 

80 assert br._connection._inbox_prefix == b"test" 

81 assert "test" in str(br._connection.new_inbox()) 

82 

83 async def test_respect_middleware(self, queue: str) -> None: 

84 routes = [] 

85 

86 class Middleware(BaseMiddleware): 

87 async def on_receive(self) -> None: 

88 routes.append(None) 

89 return await super().on_receive() 

90 

91 broker = self.get_broker(middlewares=(Middleware,)) 

92 

93 @broker.subscriber(queue) 

94 async def h1(m) -> None: ... 

95 

96 @broker.subscriber(queue + "1") 

97 async def h2(m) -> None: ... 

98 

99 async with self.patch_broker(broker) as br: 

100 await br.publish("", queue) 

101 await br.publish("", queue + "1") 

102 

103 assert len(routes) == 2 

104 

105 @pytest.mark.connected() 

106 async def test_real_respect_middleware(self, queue: str) -> None: 

107 routes = [] 

108 

109 class Middleware(BaseMiddleware): 

110 async def on_receive(self) -> None: 

111 routes.append(None) 

112 return await super().on_receive() 

113 

114 broker = self.get_broker(middlewares=(Middleware,)) 

115 

116 @broker.subscriber(queue) 

117 async def h1(m) -> None: ... 

118 

119 @broker.subscriber(queue + "1") 

120 async def h2(m) -> None: ... 

121 

122 async with self.patch_broker(broker, with_real=True) as br: 

123 await br.publish("", queue) 

124 await br.publish("", queue + "1") 

125 await h1.wait_call(3) 

126 await h2.wait_call(3) 

127 

128 assert len(routes) == 2 

129 

130 async def test_js_subscriber_mock( 

131 self, 

132 queue: str, 

133 stream: JStream, 

134 ) -> None: 

135 broker = self.get_broker() 

136 

137 @broker.subscriber(queue, stream=stream) 

138 async def m(msg) -> None: 

139 pass 

140 

141 async with self.patch_broker(broker) as br: 

142 await br.publish("hello", queue, stream=stream.name) 

143 m.mock.assert_called_once_with("hello") 

144 

145 async def test_js_publisher_mock( 

146 self, 

147 queue: str, 

148 stream: JStream, 

149 ) -> None: 

150 broker = self.get_broker() 

151 

152 publisher = broker.publisher(queue + "resp") 

153 

154 @publisher 

155 @broker.subscriber(queue, stream=stream) 

156 async def m(msg) -> str: 

157 return "response" 

158 

159 async with self.patch_broker(broker) as br: 

160 await br.publish("hello", queue, stream=stream.name) 

161 publisher.mock.assert_called_with("response") 

162 

163 async def test_any_subject_routing(self) -> None: 

164 broker = self.get_broker() 

165 

166 @broker.subscriber("test.*.subj.*") 

167 def subscriber(msg) -> None: ... 

168 

169 async with self.patch_broker(broker) as br: 

170 await br.publish("hello", "test.a.subj.b") 

171 subscriber.mock.assert_called_once_with("hello") 

172 

173 async def test_ending_subject_routing(self) -> None: 

174 broker = self.get_broker() 

175 

176 @broker.subscriber("test.>") 

177 def subscriber(msg) -> None: ... 

178 

179 async with self.patch_broker(broker) as br: 

180 await br.publish("hello", "test.a.subj.b") 

181 subscriber.mock.assert_called_once_with("hello") 

182 

183 async def test_mixed_subject_routing(self) -> None: 

184 broker = self.get_broker() 

185 

186 @broker.subscriber("*.*.subj.>") 

187 def subscriber(msg) -> None: ... 

188 

189 async with self.patch_broker(broker) as br: 

190 await br.publish("hello", "test.a.subj.b.c") 

191 subscriber.mock.assert_called_once_with("hello") 

192 

193 async def test_consume_pull( 

194 self, 

195 queue: str, 

196 stream: JStream, 

197 ) -> None: 

198 broker = self.get_broker() 

199 

200 @broker.subscriber(queue, stream=stream, pull_sub=PullSub(1)) 

201 def subscriber(m) -> None: ... 

202 

203 async with self.patch_broker(broker) as br: 

204 await br.publish("hello", queue) 

205 subscriber.mock.assert_called_once_with("hello") 

206 

207 async def test_consume_batch( 

208 self, 

209 queue: str, 

210 stream: JStream, 

211 ) -> None: 

212 broker = self.get_broker() 

213 

214 @broker.subscriber( 

215 queue, 

216 stream=stream, 

217 pull_sub=PullSub(1, batch=True), 

218 ) 

219 def subscriber(m) -> None: 

220 pass 

221 

222 async with self.patch_broker(broker) as br: 

223 await br.publish("hello", queue) 

224 subscriber.mock.assert_called_once_with(["hello"]) 

225 

226 async def test_consume_with_subject_filter(self, queue: str) -> None: 

227 broker = self.get_broker() 

228 

229 @broker.subscriber( 

230 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]), 

231 stream=JStream(queue, subjects=[f"{queue}.*"]), 

232 ) 

233 def subscriber(m) -> None: 

234 pass 

235 

236 async with self.patch_broker(broker) as br: 

237 await br.publish(1, f"{queue}.b") 

238 await br.publish(2, f"{queue}.a") 

239 subscriber.mock.assert_called_once_with(2) 

240 

241 @pytest.mark.connected() 

242 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

243 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

244 

245 @pytest.mark.connected() 

246 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

247 await super().test_broker_with_real_doesnt_get_patched() 

248 

249 @pytest.mark.connected() 

250 async def test_broker_with_real_patches_publishers_and_subscribers( 

251 self, 

252 queue: str, 

253 ) -> None: 

254 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

255 

256 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513") 

257 async def test_publisher_without_destination(self) -> None: 

258 """Fixes https://github.com/ag2ai/faststream/issues/2513.""" 

259 broker = self.get_broker() 

260 

261 # use two publishers to check that we don't have conflicts 

262 publisher = broker.publisher(subject="") 

263 another_publisher = broker.publisher(subject="") 

264 

265 async with self.patch_broker(broker): 

266 await publisher.publish(None, subject="new-key") 

267 publisher.mock.assert_called_once() 

268 

269 await another_publisher.publish(None, subject="new-key") 

270 another_publisher.mock.assert_called_once()