Coverage for tests / brokers / confluent / test_test_client.py: 99%

146 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import AsyncMock, patch 

3 

4import pytest 

5 

6from faststream import AckPolicy, BaseMiddleware 

7from faststream.confluent.annotations import KafkaMessage 

8from faststream.confluent.message import FAKE_CONSUMER 

9from faststream.confluent.testing import FakeProducer 

10from tests.brokers.base.testclient import BrokerTestclientTestcase 

11from tests.tools import spy_decorator 

12 

13from .basic import ConfluentMemoryTestcaseConfig 

14 

15 

16@pytest.mark.confluent() 

17@pytest.mark.asyncio() 

18class TestTestclient(ConfluentMemoryTestcaseConfig, BrokerTestclientTestcase): 

19 async def test_message_nack_seek(self, queue: str) -> None: 

20 broker = self.get_broker(apply_types=True) 

21 

22 @broker.subscriber( 

23 queue, 

24 group_id=f"{queue}-consume", 

25 auto_offset_reset="earliest", 

26 ack_policy=AckPolicy.REJECT_ON_ERROR, 

27 ) 

28 async def m(msg: KafkaMessage) -> None: 

29 await msg.nack() 

30 

31 async with self.patch_broker(broker) as br: 

32 with patch.object( 

33 FAKE_CONSUMER, 

34 "seek", 

35 spy_decorator(FAKE_CONSUMER.seek), 

36 ) as mocked: 

37 await br.publish("hello", queue) 

38 m.mock.assert_called_once_with("hello") 

39 mocked.mock.assert_called_once() 

40 

41 @pytest.mark.connected() 

42 async def test_with_real_testclient(self, queue: str) -> None: 

43 event = asyncio.Event() 

44 

45 broker = self.get_broker() 

46 

47 args, kwargs = self.get_subscriber_params(queue) 

48 

49 @broker.subscriber(*args, **kwargs) 

50 def subscriber(m) -> None: 

51 event.set() 

52 

53 async with self.patch_broker(broker, with_real=True) as br: 

54 await asyncio.wait( 

55 ( 

56 asyncio.create_task(br.publish("hello", queue)), 

57 asyncio.create_task(event.wait()), 

58 ), 

59 timeout=10, 

60 ) 

61 

62 assert event.is_set() 

63 

64 async def test_publisher_autoflush_mock(self, queue: str) -> None: 

65 broker = self.get_broker() 

66 

67 publisher = broker.publisher(queue + "1", autoflush=True) 

68 publisher.flush = AsyncMock() 

69 

70 @publisher 

71 @broker.subscriber(queue) 

72 async def m(msg): 

73 pass 

74 

75 async with self.patch_broker(broker) as br: 

76 await br.publish("hello", queue) 

77 

78 m.mock.assert_called_once_with("hello") 

79 publisher.mock.assert_called_once() 

80 

81 publisher.flush.assert_awaited_once() 

82 

83 async def test_batch_publisher_autoflush_mock(self, queue: str) -> None: 

84 broker = self.get_broker() 

85 

86 publisher = broker.publisher(queue + "1", batch=True, autoflush=True) 

87 publisher.flush = AsyncMock() 

88 

89 @publisher 

90 @broker.subscriber(queue) 

91 async def m(msg): 

92 return 1, 2, 3 

93 

94 async with self.patch_broker(broker) as br: 

95 await br.publish("hello", queue) 

96 

97 m.mock.assert_called_once_with("hello") 

98 publisher.mock.assert_called_once_with([1, 2, 3]) 

99 

100 publisher.flush.assert_awaited_once() 

101 

102 async def test_batch_pub_by_default_pub( 

103 self, 

104 queue: str, 

105 ) -> None: 

106 broker = self.get_broker() 

107 

108 @broker.subscriber(queue, batch=True) 

109 async def m(msg) -> None: 

110 pass 

111 

112 async with self.patch_broker(broker) as br: 

113 await br.publish("hello", queue) 

114 m.mock.assert_called_once_with(["hello"]) 

115 

116 async def test_batch_pub_by_pub_batch( 

117 self, 

118 queue: str, 

119 ) -> None: 

120 broker = self.get_broker() 

121 

122 @broker.subscriber(queue, batch=True) 

123 async def m(msg) -> None: 

124 pass 

125 

126 async with self.patch_broker(broker) as br: 

127 await br.publish_batch("hello", topic=queue) 

128 m.mock.assert_called_once_with(["hello"]) 

129 

130 async def test_batch_publisher_mock(self, queue: str) -> None: 

131 broker = self.get_broker() 

132 

133 publisher = broker.publisher(queue + "1", batch=True) 

134 

135 @publisher 

136 @broker.subscriber(queue) 

137 async def m(msg): 

138 return 1, 2, 3 

139 

140 async with self.patch_broker(broker) as br: 

141 await br.publish("hello", queue) 

142 m.mock.assert_called_once_with("hello") 

143 publisher.mock.assert_called_once_with([1, 2, 3]) 

144 

145 async def test_respect_middleware(self, queue: str) -> None: 

146 routes = [] 

147 

148 class Middleware(BaseMiddleware): 

149 async def on_receive(self) -> None: 

150 routes.append(None) 

151 return await super().on_receive() 

152 

153 broker = self.get_broker(middlewares=(Middleware,)) 

154 

155 @broker.subscriber(queue) 

156 async def h1(msg) -> None: ... 

157 

158 @broker.subscriber(queue + "1") 

159 async def h2(msg) -> None: ... 

160 

161 async with self.patch_broker(broker) as br: 

162 await br.publish("", queue) 

163 await br.publish("", queue + "1") 

164 

165 assert len(routes) == 2 

166 

167 @pytest.mark.connected() 

168 async def test_real_respect_middleware(self, queue: str) -> None: 

169 routes = [] 

170 

171 class Middleware(BaseMiddleware): 

172 async def on_receive(self) -> None: 

173 routes.append(None) 

174 return await super().on_receive() 

175 

176 broker = self.get_broker(middlewares=(Middleware,)) 

177 

178 args, kwargs = self.get_subscriber_params(queue) 

179 

180 @broker.subscriber(*args, **kwargs) 

181 async def h1(msg) -> None: ... 

182 

183 args2, kwargs2 = self.get_subscriber_params(queue + "1") 

184 

185 @broker.subscriber(*args2, **kwargs2) 

186 async def h2(msg) -> None: ... 

187 

188 async with self.patch_broker(broker, with_real=True) as br: 

189 await br.publish("", queue) 

190 await br.publish("", queue + "1") 

191 await h1.wait_call(10) 

192 await h2.wait_call(10) 

193 

194 assert len(routes) == 2 

195 

196 async def test_multiple_subscribers_different_groups(self, queue: str) -> None: 

197 broker = self.get_broker() 

198 

199 @broker.subscriber(queue, group_id="group1") 

200 async def subscriber1(msg) -> None: ... 

201 

202 @broker.subscriber(queue, group_id="group2") 

203 async def subscriber2(msg) -> None: ... 

204 

205 async with self.patch_broker(broker) as br: 

206 await br.start() 

207 await br.publish("", queue) 

208 

209 assert subscriber1.mock.call_count == 1 

210 assert subscriber2.mock.call_count == 1 

211 

212 async def test_multiple_subscribers_same_group(self, queue: str) -> None: 

213 broker = self.get_broker() 

214 

215 @broker.subscriber(queue, group_id="group1") 

216 async def subscriber1(msg) -> None: ... 

217 

218 @broker.subscriber(queue, group_id="group1") 

219 async def subscriber2(msg) -> None: ... 

220 

221 async with self.patch_broker(broker) as br: 

222 await br.start() 

223 await br.publish("", queue) 

224 

225 # we can't guarantee the order of calls 

226 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0} 

227 

228 async def test_multiple_batch_subscriber_with_different_group( 

229 self, queue: str 

230 ) -> None: 

231 broker = self.get_broker() 

232 

233 @broker.subscriber(queue, batch=True, group_id="group1") 

234 async def subscriber1(msg) -> None: ... 

235 

236 @broker.subscriber(queue, batch=True, group_id="group2") 

237 async def subscriber2(msg) -> None: ... 

238 

239 async with self.patch_broker(broker) as br: 

240 await br.start() 

241 await br.publish("", queue) 

242 

243 assert subscriber1.mock.call_count == 1 

244 assert subscriber2.mock.call_count == 1 

245 

246 async def test_multiple_batch_subscriber_with_same_group(self, queue: str) -> None: 

247 broker = self.get_broker() 

248 

249 @broker.subscriber(queue, batch=True, group_id="group1") 

250 async def subscriber1(msg) -> None: ... 

251 

252 @broker.subscriber(queue, batch=True, group_id="group1") 

253 async def subscriber2(msg) -> None: ... 

254 

255 async with self.patch_broker(broker) as br: 

256 await br.start() 

257 await br.publish("", queue) 

258 

259 # we can't guarantee the order of calls 

260 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0} 

261 

262 @pytest.mark.connected() 

263 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

264 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

265 

266 @pytest.mark.connected() 

267 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

268 await super().test_broker_with_real_doesnt_get_patched() 

269 

270 @pytest.mark.connected() 

271 async def test_broker_with_real_patches_publishers_and_subscribers( 

272 self, queue: str 

273 ) -> None: 

274 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

275 

276 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513") 

277 async def test_publisher_without_destination(self) -> None: 

278 """Fixes https://github.com/ag2ai/faststream/issues/2513.""" 

279 broker = self.get_broker() 

280 

281 # use two publishers to check that we don't have conflicts 

282 publisher = broker.publisher(topic="") 

283 another_publisher = broker.publisher(topic="") 

284 

285 async with self.patch_broker(broker): 

286 await publisher.publish(None, topic="new-key") 

287 publisher.mock.assert_called_once() 

288 

289 await another_publisher.publish(None, topic="new-key") 

290 another_publisher.mock.assert_called_once()