Coverage for tests / brokers / kafka / test_test_client.py: 99%

167 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import AsyncMock, patch 

3 

4import pytest 

5 

6from faststream import AckPolicy, BaseMiddleware 

7from faststream.kafka import TopicPartition 

8from faststream.kafka.annotations import KafkaMessage 

9from faststream.kafka.message import FAKE_CONSUMER 

10from faststream.kafka.testing import FakeProducer 

11from tests.brokers.base.testclient import BrokerTestclientTestcase 

12from tests.tools import spy_decorator 

13 

14from .basic import KafkaMemoryTestcaseConfig 

15 

16 

17@pytest.mark.kafka() 

18@pytest.mark.asyncio() 

19class TestTestclient(KafkaMemoryTestcaseConfig, BrokerTestclientTestcase): 

20 async def test_partition_match( 

21 self, 

22 queue: str, 

23 ) -> None: 

24 broker = self.get_broker() 

25 

26 @broker.subscriber(partitions=[TopicPartition(queue, 1)]) 

27 async def m(msg) -> None: 

28 pass 

29 

30 async with self.patch_broker(broker) as br: 

31 await br.publish("hello", queue) 

32 

33 m.mock.assert_called_once_with("hello") 

34 

35 async def test_partition_match_exect( 

36 self, 

37 queue: str, 

38 ) -> None: 

39 broker = self.get_broker() 

40 

41 @broker.subscriber(partitions=[TopicPartition(queue, 1)]) 

42 async def m(msg) -> None: 

43 pass 

44 

45 async with self.patch_broker(broker) as br: 

46 await br.publish("hello", queue, partition=1) 

47 

48 m.mock.assert_called_once_with("hello") 

49 

50 async def test_partition_mismatch( 

51 self, 

52 queue: str, 

53 ) -> None: 

54 broker = self.get_broker() 

55 

56 @broker.subscriber(partitions=[TopicPartition(queue, 1)]) 

57 async def m(msg) -> None: 

58 pass 

59 

60 @broker.subscriber(queue) 

61 async def m2(msg) -> None: 

62 pass 

63 

64 async with self.patch_broker(broker) as br: 

65 await br.publish("hello", queue, partition=2) 

66 

67 assert not m.mock.called 

68 m2.mock.assert_called_once_with("hello") 

69 

70 async def test_message_nack_seek( 

71 self, 

72 queue: str, 

73 ) -> None: 

74 broker = self.get_broker(apply_types=True) 

75 

76 @broker.subscriber(queue, group_id=f"{queue}1", ack_policy=AckPolicy.MANUAL) 

77 async def m(msg: KafkaMessage) -> None: 

78 await msg.nack() 

79 

80 async with self.patch_broker(broker) as br: 

81 with patch.object( 

82 FAKE_CONSUMER, 

83 "seek", 

84 spy_decorator(FAKE_CONSUMER.seek), 

85 ) as mocked: 

86 await br.publish("hello", queue) 

87 mocked.mock.assert_called_once() 

88 

89 async def test_publisher_autoflush_mock( 

90 self, 

91 queue: str, 

92 ) -> None: 

93 broker = self.get_broker() 

94 

95 publisher = broker.publisher(queue + "1", autoflush=True) 

96 publisher.flush = AsyncMock() 

97 

98 @publisher 

99 @broker.subscriber(queue) 

100 async def m(msg): 

101 return 1 

102 

103 async with self.patch_broker(broker) as br: 

104 await br.publish("hello", queue) 

105 

106 m.mock.assert_called_once_with("hello") 

107 publisher.mock.assert_called_once_with(1) 

108 

109 publisher.flush.assert_awaited_once() 

110 

111 async def test_batch_publisher_autoflush_mock( 

112 self, 

113 queue: str, 

114 ) -> None: 

115 broker = self.get_broker() 

116 

117 publisher = broker.publisher(queue + "1", batch=True, autoflush=True) 

118 publisher.flush = AsyncMock() 

119 

120 @publisher 

121 @broker.subscriber(queue) 

122 async def m(msg): 

123 return 1, 2, 3 

124 

125 async with self.patch_broker(broker) as br: 

126 await br.publish("hello", queue) 

127 

128 m.mock.assert_called_once_with("hello") 

129 publisher.mock.assert_called_once_with([1, 2, 3]) 

130 

131 publisher.flush.assert_awaited_once() 

132 

133 @pytest.mark.connected() 

134 async def test_with_real_testclient( 

135 self, 

136 queue: str, 

137 ) -> None: 

138 event = asyncio.Event() 

139 

140 broker = self.get_broker() 

141 

142 @broker.subscriber(queue) 

143 def subscriber(m) -> None: 

144 event.set() 

145 

146 async with self.patch_broker(broker, with_real=True) as br: 

147 await asyncio.wait( 

148 ( 

149 asyncio.create_task(br.publish("hello", queue)), 

150 asyncio.create_task(event.wait()), 

151 ), 

152 timeout=3, 

153 ) 

154 

155 assert event.is_set() 

156 

157 async def test_batch_pub_by_default_pub( 

158 self, 

159 queue: str, 

160 ) -> None: 

161 broker = self.get_broker() 

162 

163 @broker.subscriber(queue, batch=True) 

164 async def m(msg) -> None: 

165 pass 

166 

167 async with self.patch_broker(broker) as br: 

168 await br.publish("hello", queue) 

169 m.mock.assert_called_once_with(["hello"]) 

170 

171 async def test_batch_pub_by_pub_batch( 

172 self, 

173 queue: str, 

174 ) -> None: 

175 broker = self.get_broker() 

176 

177 @broker.subscriber(queue, batch=True) 

178 async def m(msg) -> None: 

179 pass 

180 

181 async with self.patch_broker(broker) as br: 

182 await br.publish_batch("hello", topic=queue) 

183 m.mock.assert_called_once_with(["hello"]) 

184 

185 async def test_batch_publisher_mock( 

186 self, 

187 queue: str, 

188 ) -> None: 

189 broker = self.get_broker() 

190 

191 publisher = broker.publisher(queue + "1", batch=True) 

192 

193 @publisher 

194 @broker.subscriber(queue) 

195 async def m(msg): 

196 return 1, 2, 3 

197 

198 async with self.patch_broker(broker) as br: 

199 await br.publish("hello", queue) 

200 m.mock.assert_called_once_with("hello") 

201 publisher.mock.assert_called_once_with([1, 2, 3]) 

202 

203 async def test_respect_middleware(self, queue: str) -> None: 

204 routes = [] 

205 

206 class Middleware(BaseMiddleware): 

207 async def on_receive(self) -> None: 

208 routes.append(None) 

209 return await super().on_receive() 

210 

211 broker = self.get_broker(middlewares=(Middleware,)) 

212 

213 @broker.subscriber(queue) 

214 async def h1(msg) -> None: ... 

215 

216 @broker.subscriber(queue + "1") 

217 async def h2(msg) -> None: ... 

218 

219 async with self.patch_broker(broker) as br: 

220 await br.publish("", queue) 

221 await br.publish("", queue + "1") 

222 

223 assert len(routes) == 2 

224 

225 @pytest.mark.connected() 

226 async def test_real_respect_middleware(self, queue: str) -> None: 

227 routes = [] 

228 

229 class Middleware(BaseMiddleware): 

230 async def on_receive(self) -> None: 

231 routes.append(None) 

232 return await super().on_receive() 

233 

234 broker = self.get_broker(middlewares=(Middleware,)) 

235 

236 @broker.subscriber(queue) 

237 async def h1(msg) -> None: ... 

238 

239 @broker.subscriber(queue + "1") 

240 async def h2(msg) -> None: ... 

241 

242 async with self.patch_broker(broker, with_real=True) as br: 

243 await br.publish("", queue) 

244 await br.publish("", queue + "1") 

245 await h1.wait_call(3) 

246 await h2.wait_call(3) 

247 

248 assert len(routes) == 2 

249 

250 async def test_multiple_subscribers_different_groups( 

251 self, 

252 queue: str, 

253 ) -> None: 

254 test_broker = self.get_broker() 

255 

256 @test_broker.subscriber(queue, group_id="group1") 

257 async def subscriber1(msg) -> None: ... 

258 

259 @test_broker.subscriber(queue, group_id="group2") 

260 async def subscriber2(msg) -> None: ... 

261 

262 async with self.patch_broker(test_broker) as br: 

263 await br.start() 

264 await br.publish("", queue) 

265 

266 assert subscriber1.mock.call_count == 1 

267 assert subscriber2.mock.call_count == 1 

268 

269 async def test_multiple_subscribers_same_group(self, queue: str) -> None: 

270 broker = self.get_broker() 

271 

272 @broker.subscriber(queue, group_id="group1") 

273 async def subscriber1(msg) -> None: ... 

274 

275 @broker.subscriber(queue, group_id="group1") 

276 async def subscriber2(msg) -> None: ... 

277 

278 async with self.patch_broker(broker) as br: 

279 await br.start() 

280 await br.publish("", queue) 

281 

282 # we can't guarantee the order of calls 

283 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0} 

284 

285 async def test_multiple_batch_subscriber_with_different_group( 

286 self, 

287 queue: str, 

288 ) -> None: 

289 broker = self.get_broker() 

290 

291 @broker.subscriber(queue, batch=True, group_id="group1") 

292 async def subscriber1(msg) -> None: ... 

293 

294 @broker.subscriber(queue, batch=True, group_id="group2") 

295 async def subscriber2(msg) -> None: ... 

296 

297 async with self.patch_broker(broker) as br: 

298 await br.start() 

299 await br.publish("", queue) 

300 

301 assert subscriber1.mock.call_count == 1 

302 assert subscriber2.mock.call_count == 1 

303 

304 async def test_multiple_batch_subscriber_with_same_group( 

305 self, 

306 queue: str, 

307 ) -> None: 

308 broker = self.get_broker() 

309 

310 @broker.subscriber(queue, batch=True, group_id="group1") 

311 async def subscriber1(msg) -> None: ... 

312 

313 @broker.subscriber(queue, batch=True, group_id="group1") 

314 async def subscriber2(msg) -> None: ... 

315 

316 async with self.patch_broker(broker) as br: 

317 await br.start() 

318 await br.publish("", queue) 

319 

320 # we can't guarantee the order of calls 

321 assert {subscriber1.mock.call_count, subscriber2.mock.call_count} == {1, 0} 

322 

323 @pytest.mark.connected() 

324 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

325 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

326 

327 @pytest.mark.connected() 

328 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

329 await super().test_broker_with_real_doesnt_get_patched() 

330 

331 @pytest.mark.connected() 

332 async def test_broker_with_real_patches_publishers_and_subscribers( 

333 self, 

334 queue: str, 

335 ) -> None: 

336 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

337 

338 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513") 

339 async def test_publisher_without_destination(self) -> None: 

340 """Fixes https://github.com/ag2ai/faststream/issues/2513.""" 

341 broker = self.get_broker() 

342 

343 # use two publishers to check that we don't have conflicts 

344 publisher = broker.publisher(topic="") 

345 another_publisher = broker.publisher(topic="") 

346 

347 async with self.patch_broker(broker): 

348 await publisher.publish(None, topic="new-key") 

349 publisher.mock.assert_called_once() 

350 

351 await another_publisher.publish(None, topic="new-key") 

352 another_publisher.mock.assert_called_once()