Coverage for tests / brokers / rabbit / test_test_client.py: 97%

119 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2import datetime as dt 

3from typing import Any 

4from unittest.mock import MagicMock 

5 

6import pytest 

7from freezegun import freeze_time 

8 

9from faststream import BaseMiddleware 

10from faststream.exceptions import SubscriberNotFound 

11from faststream.rabbit import ( 

12 ExchangeType, 

13 RabbitBroker, 

14 RabbitExchange, 

15 RabbitQueue, 

16) 

17from faststream.rabbit.annotations import RabbitMessage 

18from faststream.rabbit.testing import FakeProducer, _is_handler_matches, apply_pattern 

19from tests.brokers.base.testclient import BrokerTestclientTestcase 

20 

21from .basic import RabbitMemoryTestcaseConfig 

22from .test_publish import TestPublishWithExchange as PublishWithExchangeCase 

23 

24_frozen_time = dt.datetime(2026, 2, 10, 12, 0, 0, tzinfo=dt.timezone.utc) 

25 

26 

27@pytest.mark.rabbit() 

28@pytest.mark.asyncio() 

29class TestTestclient( 

30 PublishWithExchangeCase, RabbitMemoryTestcaseConfig, BrokerTestclientTestcase 

31): 

32 @pytest.mark.connected() 

33 async def test_with_real_testclient( 

34 self, 

35 queue: str, 

36 ) -> None: 

37 event = asyncio.Event() 

38 

39 broker = self.get_broker() 

40 

41 @broker.subscriber(queue) 

42 def subscriber(m) -> None: 

43 event.set() 

44 

45 async with self.patch_broker(broker, with_real=True) as br: 

46 await asyncio.wait( 

47 ( 

48 asyncio.create_task(br.publish("hello", queue)), 

49 asyncio.create_task(event.wait()), 

50 ), 

51 timeout=3, 

52 ) 

53 

54 assert event.is_set() 

55 

56 async def test_direct_not_found( 

57 self, 

58 queue: str, 

59 ) -> None: 

60 broker = self.get_broker() 

61 

62 async with self.patch_broker(broker) as br: 

63 with pytest.raises(SubscriberNotFound): 

64 await br.request("", "") 

65 

66 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513") 

67 async def test_publisher_without_destination(self) -> None: 

68 """Fixes https://github.com/ag2ai/faststream/issues/2513.""" 

69 broker = self.get_broker() 

70 

71 # use two publishers to check that we don't have conflicts 

72 publisher = broker.publisher(exchange="test_exchange") 

73 another_publisher = broker.publisher(exchange="test_exchange") 

74 

75 async with self.patch_broker(broker): 

76 await publisher.publish(None, routing_key="new-key") 

77 publisher.mock.assert_called_once() 

78 

79 await another_publisher.publish(None, routing_key="new-key") 

80 another_publisher.mock.assert_called_once() 

81 

82 async def test_consume_manual_ack( 

83 self, 

84 queue: str, 

85 exchange: RabbitExchange, 

86 ) -> None: 

87 broker = self.get_broker(apply_types=True) 

88 

89 consume = asyncio.Event() 

90 consume2 = asyncio.Event() 

91 consume3 = asyncio.Event() 

92 

93 @broker.subscriber(queue=queue, exchange=exchange) 

94 async def handler(msg: RabbitMessage) -> None: 

95 await msg.raw_message.ack() 

96 consume.set() 

97 

98 @broker.subscriber(queue=queue + "1", exchange=exchange) 

99 async def handler2(msg: RabbitMessage) -> None: 

100 await msg.raw_message.nack() 

101 consume2.set() 

102 raise ValueError 

103 

104 @broker.subscriber(queue=queue + "2", exchange=exchange) 

105 async def handler3(msg: RabbitMessage) -> None: 

106 await msg.raw_message.reject() 

107 consume3.set() 

108 raise ValueError 

109 

110 async with self.patch_broker(broker) as br: 

111 await asyncio.wait( 

112 ( 

113 asyncio.create_task( 

114 br.publish("hello", queue=queue, exchange=exchange), 

115 ), 

116 asyncio.create_task( 

117 br.publish("hello", queue=queue + "1", exchange=exchange), 

118 ), 

119 asyncio.create_task( 

120 br.publish("hello", queue=queue + "2", exchange=exchange), 

121 ), 

122 asyncio.create_task(consume.wait()), 

123 asyncio.create_task(consume2.wait()), 

124 asyncio.create_task(consume3.wait()), 

125 ), 

126 timeout=3, 

127 ) 

128 

129 assert consume.is_set() 

130 assert consume2.is_set() 

131 assert consume3.is_set() 

132 

133 async def test_respect_middleware(self, queue: str) -> None: 

134 routes = [] 

135 

136 class Middleware(BaseMiddleware): 

137 async def on_receive(self) -> None: 

138 routes.append(None) 

139 return await super().on_receive() 

140 

141 broker = self.get_broker(middlewares=(Middleware,)) 

142 

143 @broker.subscriber(queue) 

144 async def h1(msg) -> None: ... 

145 

146 @broker.subscriber(queue + "1") 

147 async def h2(msg) -> None: ... 

148 

149 async with self.patch_broker(broker) as br: 

150 await br.publish("", queue) 

151 await br.publish("", queue + "1") 

152 

153 assert len(routes) == 2 

154 

155 @pytest.mark.connected() 

156 async def test_real_respect_middleware(self, queue: str) -> None: 

157 routes = [] 

158 

159 class Middleware(BaseMiddleware): 

160 async def on_receive(self) -> None: 

161 routes.append(None) 

162 return await super().on_receive() 

163 

164 broker = self.get_broker(middlewares=(Middleware,)) 

165 

166 @broker.subscriber(queue) 

167 async def h1(msg) -> None: ... 

168 

169 @broker.subscriber(queue + "1") 

170 async def h2(msg) -> None: ... 

171 

172 async with self.patch_broker(broker, with_real=True) as br: 

173 await br.publish("", queue) 

174 await br.publish("", queue + "1") 

175 await h1.wait_call(3) 

176 await h2.wait_call(3) 

177 

178 assert len(routes) == 2 

179 

180 @pytest.mark.connected() 

181 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

182 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

183 

184 @pytest.mark.connected() 

185 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

186 await super().test_broker_with_real_doesnt_get_patched() 

187 

188 @pytest.mark.connected() 

189 async def test_broker_with_real_patches_publishers_and_subscribers( 

190 self, 

191 queue: str, 

192 ) -> None: 

193 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

194 

195 @pytest.mark.asyncio() 

196 @pytest.mark.parametrize( 

197 ("expiration", "expected"), 

198 ( 

199 pytest.param(None, None, id="none"), 

200 pytest.param(1, 1, id="int"), 

201 pytest.param(1.5, 1.5, id="float"), 

202 pytest.param(dt.timedelta(seconds=1.1), 1.1, id="timedelta"), 

203 pytest.param(_frozen_time, 0, id="datetime"), 

204 ), 

205 ) 

206 @freeze_time(_frozen_time) 

207 async def test_publish_expiration_propagated( 

208 self, expiration: Any, expected: Any, queue: str, mock: MagicMock 

209 ) -> None: 

210 broker = self.get_broker(apply_types=True) 

211 

212 args, kwargs = self.get_subscriber_params(queue) 

213 

214 @broker.subscriber(*args, **kwargs) 

215 async def m(msg: RabbitMessage) -> None: 

216 mock(msg) 

217 

218 async with self.patch_broker(broker) as br: 

219 await br.start() 

220 await br.publish("hello", queue, expiration=expiration) 

221 msg = mock.call_args[0][0] 

222 assert msg.raw_message.expiration == expected 

223 

224 

225@pytest.mark.parametrize( 

226 ("pattern", "current", "result"), 

227 ( 

228 pytest.param("#", "1.2.3", True, id="#"), 

229 pytest.param("*", "1", True, id="*"), 

230 pytest.param("*", "1.2", False, id="* - broken"), 

231 pytest.param("test.*", "test.1", True, id="test.*"), 

232 pytest.param("test.#", "test.1", True, id="test.#"), 

233 pytest.param("#.test.#", "1.2.test.1.2", True, id="#.test.#"), 

234 pytest.param("#.test.*", "1.2.test.1", True, id="#.test.*"), 

235 pytest.param("#.test.*.*", "1.2.test.1.2", True, id="#.test.*."), 

236 pytest.param("#.test.*.*.*", "1.2.test.1.2", False, id="#.test.*.*.* - broken"), 

237 pytest.param( 

238 "#.test.*.test.#", 

239 "1.2.test.1.test.1.2", 

240 True, 

241 id="#.test.*.test.#", 

242 ), 

243 pytest.param("#.*.test", "1.2.2.test", True, id="#.*.test"), 

244 pytest.param("#.2.*.test", "1.2.2.test", True, id="#.2.*.test"), 

245 pytest.param("#.*.*.test", "1.2.2.test", True, id="#.*.*.test"), 

246 pytest.param("*.*.*.test", "1.2.test", False, id="*.*.*.test - broken"), 

247 pytest.param("#.*.*.test", "1.2.test", False, id="#.*.*.test - broken"), 

248 ), 

249) 

250def test(pattern: str, current: str, result: bool) -> None: 

251 assert apply_pattern(pattern, current) == result 

252 

253 

254exch_direct = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.DIRECT) 

255exch_fanout = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT) 

256exch_topic = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.TOPIC) 

257exch_headers = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS) 

258reqular_queue = RabbitQueue("test-reqular-queue", auto_delete=True) 

259 

260routing_key_queue = RabbitQueue( 

261 "test-routing-key-queue", 

262 auto_delete=True, 

263 routing_key="*.info", 

264) 

265one_key_queue = RabbitQueue( 

266 "test-one-key-queue", 

267 auto_delete=True, 

268 bind_arguments={"key": 1}, 

269) 

270any_keys_queue = RabbitQueue( 

271 "test-any-keys-queue", 

272 auto_delete=True, 

273 bind_arguments={"key": 2, "key2": 2, "x-match": "any"}, 

274) 

275all_keys_queue = RabbitQueue( 

276 "test-all-keys-queue", 

277 auto_delete=True, 

278 bind_arguments={"key": 2, "key2": 2, "x-match": "all"}, 

279) 

280 

281broker = RabbitBroker() 

282 

283 

284@pytest.mark.rabbit() 

285@pytest.mark.parametrize( 

286 ( 

287 "queue", 

288 "exchange", 

289 "routing_key", 

290 "headers", 

291 "expected_result", 

292 ), 

293 ( 

294 pytest.param( 

295 reqular_queue, 

296 exch_direct, 

297 reqular_queue.routing(), 

298 {}, 

299 True, 

300 id="direct match", 

301 ), 

302 pytest.param( 

303 reqular_queue, 

304 exch_direct, 

305 "wrong key", 

306 {}, 

307 False, 

308 id="direct mismatch", 

309 ), 

310 pytest.param( 

311 reqular_queue, 

312 exch_fanout, 

313 "", 

314 {}, 

315 True, 

316 id="fanout match", 

317 ), 

318 pytest.param( 

319 routing_key_queue, 

320 exch_topic, 

321 "log.info", 

322 {}, 

323 True, 

324 id="topic match", 

325 ), 

326 pytest.param( 

327 routing_key_queue, 

328 exch_topic, 

329 "log.wrong", 

330 {}, 

331 False, 

332 id="topic mismatch", 

333 ), 

334 pytest.param( 

335 one_key_queue, 

336 exch_headers, 

337 "", 

338 {"key": 1}, 

339 True, 

340 id="one header match", 

341 ), 

342 pytest.param( 

343 one_key_queue, 

344 exch_headers, 

345 "", 

346 {"key": "wrong"}, 

347 False, 

348 id="one header mismatch", 

349 ), 

350 pytest.param( 

351 any_keys_queue, 

352 exch_headers, 

353 "", 

354 {"key2": 2}, 

355 True, 

356 id="any headers match", 

357 ), 

358 pytest.param( 

359 any_keys_queue, 

360 exch_headers, 

361 "", 

362 {"key2": "wrong"}, 

363 False, 

364 id="any headers mismatch", 

365 ), 

366 pytest.param( 

367 all_keys_queue, 

368 exch_headers, 

369 "", 

370 {"key": 2, "key2": 2}, 

371 True, 

372 id="all headers match", 

373 ), 

374 pytest.param( 

375 all_keys_queue, 

376 exch_headers, 

377 "", 

378 {"key": "wrong", "key2": 2}, 

379 False, 

380 id="all headers mismatch", 

381 ), 

382 ), 

383) 

384def test_in_memory_routing( 

385 queue: str, 

386 exchange: RabbitExchange, 

387 routing_key: str, 

388 headers: dict[str, Any], 

389 expected_result: bool, 

390) -> None: 

391 subscriber = broker.subscriber(queue, exchange) 

392 assert ( 

393 _is_handler_matches(subscriber, routing_key, headers, exchange) is expected_result 

394 )