Coverage for tests / asgi / testcase.py: 99%

238 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2import math 

3from collections.abc import Callable 

4from typing import Any 

5from unittest.mock import AsyncMock, MagicMock 

6 

7import pytest 

8from dirty_equals import Contains, IsFloat, IsList, IsPartialDict, IsStr 

9from fast_depends import Depends 

10from freezegun import freeze_time 

11from starlette.applications import Starlette 

12from starlette.routing import Mount 

13from starlette.testclient import TestClient 

14from starlette.websockets import WebSocketDisconnect 

15 

16from faststream._internal.context import Context 

17from faststream.annotations import FastStream, Logger 

18from faststream.asgi import ( 

19 AsgiFastStream, 

20 AsgiResponse, 

21 AsyncAPIRoute, 

22 Request, 

23 get, 

24 make_asyncapi_asgi, 

25 make_ping_asgi, 

26 post, 

27) 

28from faststream.asgi.params import Header, Query 

29from faststream.asgi.types import ASGIApp, Scope 

30from faststream.specification import AsyncAPI 

31 

32 

33class AsgiTestcase: 

34 def get_broker(self) -> Any: 

35 raise NotImplementedError 

36 

37 def get_test_broker(self, broker: Any) -> Any: 

38 raise NotImplementedError 

39 

40 @pytest.mark.asyncio() 

41 async def test_not_found(self) -> None: 

42 broker = self.get_broker() 

43 app = AsgiFastStream(broker) 

44 

45 async with self.get_test_broker(broker): 

46 with TestClient(app) as client: 

47 response = client.get("/") 

48 assert response.status_code == 404 

49 

50 @pytest.mark.asyncio() 

51 async def test_ws_not_found(self) -> None: 

52 broker = self.get_broker() 

53 

54 app = AsgiFastStream(broker) 

55 

56 async with self.get_test_broker(broker): 

57 with TestClient(app) as client: 

58 with pytest.raises(WebSocketDisconnect): 

59 with client.websocket_connect("/ws"): # raises error 

60 pass 

61 

62 @pytest.mark.asyncio() 

63 async def test_asgi_ping_healthy(self) -> None: 

64 broker = self.get_broker() 

65 

66 app = AsgiFastStream( 

67 broker, 

68 asgi_routes=[("/health", make_ping_asgi(broker, timeout=5.0))], 

69 ) 

70 

71 async with self.get_test_broker(broker): 

72 with TestClient(app) as client: 

73 response = client.get("/health") 

74 assert response.status_code == 204 

75 

76 @pytest.mark.asyncio() 

77 async def test_asgi_ping_unhealthy(self) -> None: 

78 broker = self.get_broker() 

79 

80 app = AsgiFastStream( 

81 broker, 

82 asgi_routes=[ 

83 ("/health", make_ping_asgi(broker, timeout=5.0)), 

84 ], 

85 ) 

86 async with self.get_test_broker(broker) as br: 

87 br.ping = AsyncMock() 

88 br.ping.return_value = False 

89 

90 with TestClient(app) as client: 

91 response = client.get("/health") 

92 assert response.status_code == 500 

93 

94 @pytest.mark.asyncio() 

95 async def test_asyncapi_asgi(self) -> None: 

96 broker = self.get_broker() 

97 

98 app = AsgiFastStream( 

99 broker, 

100 specification=AsyncAPI(), 

101 asyncapi_path="/docs", 

102 ) 

103 

104 async with self.get_test_broker(broker): 

105 with TestClient(app) as client: 

106 response = client.get("/docs") 

107 assert response.status_code == 200, response 

108 assert response.text 

109 

110 @pytest.mark.asyncio() 

111 async def test_asyncapi_asgi_if_broker_set_by_method(self) -> None: 

112 broker = self.get_broker() 

113 

114 app = AsgiFastStream( 

115 specification=AsyncAPI(), 

116 asyncapi_path="/docs", 

117 ) 

118 

119 app.set_broker(broker) 

120 

121 async with self.get_test_broker(broker): 

122 with TestClient(app) as client: 

123 response = client.get("/docs") 

124 assert response.status_code == 200, response 

125 assert response.text 

126 

127 @pytest.mark.asyncio() 

128 @pytest.mark.parametrize( 

129 ("decorator", "client_method"), 

130 ( 

131 pytest.param(get, "get", id="get"), 

132 pytest.param(post, "post", id="post"), 

133 ), 

134 ) 

135 async def test_decorators( 

136 self, decorator: Callable[..., ASGIApp], client_method: str 

137 ) -> None: 

138 @decorator 

139 async def some_handler(scope: Scope) -> AsgiResponse: 

140 return AsgiResponse(body=b"test", status_code=200) 

141 

142 broker = self.get_broker() 

143 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)]) 

144 

145 async with self.get_test_broker(broker): 

146 with TestClient(app) as client: 

147 response = getattr(client, client_method)("/test") 

148 assert response.status_code == 200 

149 assert response.text == "test" 

150 

151 @pytest.mark.asyncio() 

152 @pytest.mark.parametrize( 

153 ("decorator", "client_method"), 

154 ( 

155 pytest.param(get, "get", id="get"), 

156 pytest.param(post, "post", id="post"), 

157 ), 

158 ) 

159 async def test_context_injected( 

160 self, decorator: Callable[..., ASGIApp], client_method: str 

161 ) -> None: 

162 @decorator 

163 async def some_handler( 

164 request: Request, logger: Logger, app: FastStream 

165 ) -> AsgiResponse: 

166 return AsgiResponse( 

167 body=f"{request.__class__.__name__} {logger.__class__.__name__} {app.__class__.__name__}".encode(), 

168 status_code=200, 

169 ) 

170 

171 broker = self.get_broker() 

172 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)]) 

173 

174 async with self.get_test_broker(broker): 

175 with TestClient(app) as client: 

176 response = getattr(client, client_method)("/test") 

177 assert response.status_code == 200 

178 assert response.text == "AsgiRequest Logger AsgiFastStream" 

179 

180 @pytest.mark.asyncio() 

181 @pytest.mark.parametrize( 

182 ("decorator", "client_method"), 

183 ( 

184 pytest.param(get, "get", id="get"), 

185 pytest.param(post, "post", id="post"), 

186 ), 

187 ) 

188 async def test_fast_depends_injected( 

189 self, decorator: Callable[..., ASGIApp], client_method: str 

190 ) -> None: 

191 def get_string() -> str: 

192 return "test" 

193 

194 @decorator 

195 async def some_handler(string=Depends(get_string)) -> AsgiResponse: # noqa: B008 

196 return AsgiResponse(body=string.encode(), status_code=200) 

197 

198 broker = self.get_broker() 

199 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)]) 

200 

201 async with self.get_test_broker(broker): 

202 with TestClient(app) as client: 

203 response = getattr(client, client_method)("/test") 

204 assert response.status_code == 200 

205 assert response.text == "test" 

206 

207 @pytest.mark.asyncio() 

208 @pytest.mark.parametrize( 

209 "dependency", 

210 ( 

211 pytest.param(Query(), id="query"), 

212 pytest.param(Header(), id="header"), 

213 ), 

214 ) 

215 async def test_validation_error_handled(self, dependency: Context) -> None: 

216 @get 

217 async def some_handler(dep=dependency) -> AsgiResponse: 

218 return AsgiResponse(status_code=200) 

219 

220 broker = self.get_broker() 

221 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)]) 

222 

223 async with self.get_test_broker(broker): 

224 with TestClient(app) as client: 

225 response = client.get("/test") 

226 assert response.status_code == 422 

227 assert response.text == "Validation error" 

228 

229 def test_asyncapi_pure_asgi(self) -> None: 

230 broker = self.get_broker() 

231 

232 app = Starlette(routes=[Mount("/", make_asyncapi_asgi(AsyncAPI(broker)))]) 

233 

234 with TestClient(app) as client: 

235 response = client.get("/") 

236 assert response.status_code == 200 

237 assert response.text == Contains("<!DOCTYPE html>") 

238 

239 # ===== TryItOut tests ===== 

240 @pytest.mark.asyncio() 

241 async def test_try_it_out_message_delivered_to_subscriber( 

242 self, queue: str, mock: MagicMock 

243 ) -> None: 

244 broker = self.get_broker() 

245 

246 @broker.subscriber(queue) 

247 async def handler(msg: Any) -> None: 

248 mock(msg) 

249 

250 app = AsgiFastStream( 

251 broker, 

252 asyncapi_path=AsyncAPIRoute("/asyncapi", try_it_out=True), 

253 ) 

254 

255 async with self.get_test_broker(broker): 

256 with TestClient(app) as client: 

257 client.post( 

258 "/asyncapi/try", 

259 json={ 

260 "channelName": queue, 

261 "message": { 

262 "operation_id": "op", 

263 "operation_type": "subscribe", 

264 "message": {"text": "hello"}, 

265 }, 

266 "options": {"sendToRealBroker": False}, 

267 }, 

268 ) 

269 

270 mock.assert_called_once_with(IsPartialDict(text="hello")) 

271 

272 @pytest.mark.asyncio() 

273 async def test_try_it_out_string_payload_delivered( 

274 self, queue: str, mock: MagicMock 

275 ) -> None: 

276 """Plugin wraps primitive payloads in message.message — ensure they arrive correctly.""" 

277 broker = self.get_broker() 

278 

279 @broker.subscriber(queue) 

280 async def handler(msg: Any) -> None: 

281 mock(msg) 

282 

283 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

284 

285 async with self.get_test_broker(broker): 

286 with TestClient(app) as client: 

287 client.post( 

288 "/asyncapi/try", 

289 json={ 

290 "channelName": queue, 

291 "message": { 

292 "operation_id": "op", 

293 "operation_type": "subscribe", 

294 "message": "hello", 

295 }, 

296 "options": {"sendToRealBroker": False}, 

297 }, 

298 ) 

299 

300 mock.assert_called_once_with("hello") 

301 

302 @pytest.mark.asyncio() 

303 async def test_try_it_out_integer_payload_delivered( 

304 self, queue: str, mock: MagicMock 

305 ) -> None: 

306 """Primitive int payload should arrive correctly.""" 

307 broker = self.get_broker() 

308 

309 @broker.subscriber(queue) 

310 async def handler(msg: int) -> None: 

311 mock(msg) 

312 

313 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

314 

315 async with self.get_test_broker(broker): 

316 with TestClient(app) as client: 

317 client.post( 

318 "/asyncapi/try", 

319 json={ 

320 "channelName": queue, 

321 "message": { 

322 "operation_id": "op", 

323 "operation_type": "subscribe", 

324 "message": 42, 

325 }, 

326 "options": {"sendToRealBroker": False}, 

327 }, 

328 ) 

329 

330 mock.assert_called_once_with(42) 

331 

332 @pytest.mark.asyncio() 

333 async def test_try_it_out_float_payload_delivered( 

334 self, queue: str, mock: MagicMock 

335 ) -> None: 

336 """Primitive float payload should arrive correctly.""" 

337 broker = self.get_broker() 

338 

339 @broker.subscriber(queue) 

340 async def handler(msg: float) -> None: 

341 mock(msg) 

342 

343 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

344 

345 async with self.get_test_broker(broker): 

346 with TestClient(app) as client: 

347 client.post( 

348 "/asyncapi/try", 

349 json={ 

350 "channelName": queue, 

351 "message": { 

352 "operation_id": "op", 

353 "operation_type": "subscribe", 

354 "message": math.pi, 

355 }, 

356 "options": {"sendToRealBroker": False}, 

357 }, 

358 ) 

359 

360 mock.assert_called_once_with(IsFloat(approx=math.pi)) 

361 

362 @pytest.mark.asyncio() 

363 async def test_try_it_out_boolean_payload_delivered( 

364 self, queue: str, mock: MagicMock 

365 ) -> None: 

366 """Primitive boolean payload should arrive correctly.""" 

367 broker = self.get_broker() 

368 

369 @broker.subscriber(queue) 

370 async def handler(msg: bool) -> None: 

371 mock(msg) 

372 

373 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

374 

375 async with self.get_test_broker(broker): 

376 with TestClient(app) as client: 

377 client.post( 

378 "/asyncapi/try", 

379 json={ 

380 "channelName": queue, 

381 "message": { 

382 "operation_id": "op", 

383 "operation_type": "subscribe", 

384 "message": True, 

385 }, 

386 "options": {"sendToRealBroker": False}, 

387 }, 

388 ) 

389 

390 mock.assert_called_once_with(True) 

391 

392 @pytest.mark.asyncio() 

393 async def test_try_it_out_array_payload_delivered( 

394 self, queue: str, mock: MagicMock 

395 ) -> None: 

396 """Array payload should arrive correctly.""" 

397 broker = self.get_broker() 

398 

399 @broker.subscriber(queue) 

400 async def handler(msg: list[Any]) -> None: 

401 mock(msg) 

402 

403 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

404 

405 async with self.get_test_broker(broker): 

406 with TestClient(app) as client: 

407 client.post( 

408 "/asyncapi/try", 

409 json={ 

410 "channelName": queue, 

411 "message": { 

412 "operation_id": "op", 

413 "operation_type": "subscribe", 

414 "message": ["one", "two", "three"], 

415 }, 

416 "options": {"sendToRealBroker": False}, 

417 }, 

418 ) 

419 

420 mock.assert_called_once_with(IsList("one", "two", "three")) 

421 

422 @pytest.mark.asyncio() 

423 async def test_try_it_out_object_payload_delivered( 

424 self, queue: str, mock: MagicMock 

425 ) -> None: 

426 """Object (dict) payload should arrive correctly.""" 

427 broker = self.get_broker() 

428 

429 @broker.subscriber(queue) 

430 async def handler(msg: dict[str, Any]) -> None: 

431 mock(msg) 

432 

433 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

434 

435 async with self.get_test_broker(broker): 

436 with TestClient(app) as client: 

437 client.post( 

438 "/asyncapi/try", 

439 json={ 

440 "channelName": queue, 

441 "message": { 

442 "operation_id": "op", 

443 "operation_type": "subscribe", 

444 "message": {"field": "value", "count": 42, "mock": True}, 

445 }, 

446 "options": {"sendToRealBroker": False}, 

447 }, 

448 ) 

449 

450 mock.assert_called_once_with(IsPartialDict(field="value", count=42, mock=True)) 

451 

452 @pytest.mark.asyncio() 

453 async def test_try_it_out_memory_subsricber_returns_result(self, queue: str) -> None: 

454 broker = self.get_broker() 

455 

456 @broker.subscriber(queue) 

457 async def handler(msg: dict[str, Any]) -> dict[str, Any]: 

458 return {"result": 1} 

459 

460 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

461 

462 async with self.get_test_broker(broker): 

463 with TestClient(app) as client: 

464 response = client.post( 

465 "/asyncapi/try", 

466 json={ 

467 "channelName": queue, 

468 "message": { 

469 "operation_id": "op", 

470 "operation_type": "subscribe", 

471 "message": {"data": "hello"}, 

472 }, 

473 "options": {"sendToRealBroker": False}, 

474 }, 

475 ) 

476 assert response.status_code == 200, response.json() 

477 assert response.json() == IsPartialDict(result=1) 

478 

479 @pytest.mark.asyncio() 

480 async def test_try_it_out_disabled(self, queue: str) -> None: 

481 broker = self.get_broker() 

482 

483 app = AsgiFastStream( 

484 broker, 

485 asyncapi_path=AsyncAPIRoute("/asyncapi", try_it_out=False), 

486 ) 

487 

488 async with self.get_test_broker(broker): 

489 with TestClient(app) as client: 

490 r = client.post( 

491 "/asyncapi/try", 

492 json={ 

493 "channelName": queue, 

494 "message": { 

495 "operation_id": "op", 

496 "operation_type": "subscribe", 

497 "message": {"text": "hello"}, 

498 }, 

499 "options": {"sendToRealBroker": False}, 

500 }, 

501 ) 

502 assert r.status_code == 404 

503 

504 @pytest.mark.asyncio() 

505 async def test_try_it_out_missing_channel_returns_400(self) -> None: 

506 broker = self.get_broker() 

507 

508 app = AsgiFastStream(broker, asyncapi_path="/docs") 

509 

510 async with self.get_test_broker(broker): 

511 with TestClient(app) as client: 

512 response = client.post("/docs/try", json={"message": {}}) 

513 assert response.status_code == 400 

514 assert response.json() == IsPartialDict(details="Missing channelName") 

515 

516 @pytest.mark.asyncio() 

517 async def test_try_it_out_channel_not_found(self, queue: str) -> None: 

518 broker = self.get_broker() 

519 

520 app = AsgiFastStream(broker, asyncapi_path="/docs") 

521 

522 async with self.get_test_broker(broker): 

523 with TestClient(app) as client: 

524 response = client.post( 

525 "/docs/try", 

526 json={ 

527 "channelName": queue, 

528 "message": { 

529 "operation_id": "op", 

530 "operation_type": "subscribe", 

531 "message": {"text": "hello"}, 

532 }, 

533 "options": {"sendToRealBroker": False}, 

534 }, 

535 ) 

536 assert response.status_code == 404, response.status_code 

537 assert response.json() == IsPartialDict( 

538 details=IsStr(regex=r".+ destination not found\.") 

539 ) 

540 

541 @pytest.mark.asyncio() 

542 async def test_try_it_out_path_follows_asyncapi_path(self, queue: str) -> None: 

543 broker = self.get_broker() 

544 

545 @broker.subscriber(queue) 

546 async def handler(msg: dict[str, Any]) -> None: 

547 pass 

548 

549 app = AsgiFastStream(broker, asyncapi_path="/custom/docs") 

550 

551 async with self.get_test_broker(broker): 

552 with TestClient(app) as client: 

553 response = client.post( 

554 "/custom/docs/try", 

555 json={ 

556 "channelName": queue, 

557 "message": { 

558 "operation_id": "op", 

559 "operation_type": "subscribe", 

560 "message": {}, 

561 }, 

562 "options": {"sendToRealBroker": False}, 

563 }, 

564 ) 

565 

566 assert response.status_code == 200 

567 assert response.json() == "ok" 

568 

569 @pytest.mark.asyncio() 

570 @freeze_time(auto_tick_seconds=5) 

571 async def test_try_it_out_subscriber_completes_within_timeout( 

572 self, queue: str 

573 ) -> None: 

574 """Subscriber that finishes before the configured timeout should return a 200 with its result.""" 

575 broker = self.get_broker() 

576 

577 @broker.subscriber(queue) 

578 async def handler(msg: Any) -> dict[str, Any]: 

579 await asyncio.sleep(5) 

580 return {"result": "done"} 

581 

582 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

583 

584 async with self.get_test_broker(broker): 

585 with TestClient(app) as client: 

586 response = client.post( 

587 "/asyncapi/try", 

588 json={ 

589 "channelName": queue, 

590 "message": { 

591 "operation_id": "op", 

592 "operation_type": "subscribe", 

593 "message": {"data": "hello"}, 

594 }, 

595 "options": {"sendToRealBroker": False}, 

596 }, 

597 ) 

598 

599 assert response.status_code == 200, response.json() 

600 assert response.json() == IsPartialDict(result="done") 

601 

602 @pytest.mark.asyncio() 

603 @freeze_time(auto_tick_seconds=30) 

604 async def test_try_it_out_subscriber_exceeds_timeout_returns_500( 

605 self, queue: str 

606 ) -> None: 

607 """Subscriber that runs longer than the configured timeout should produce a 500 carrying the timeout exception.""" 

608 broker = self.get_broker() 

609 

610 @broker.subscriber(queue) 

611 async def handler(msg: Any) -> None: 

612 await asyncio.sleep(30) 

613 

614 app = AsgiFastStream(broker, asyncapi_path="/asyncapi") 

615 

616 async with self.get_test_broker(broker): 

617 with TestClient(app) as client: 

618 response = client.post( 

619 "/asyncapi/try", 

620 json={ 

621 "channelName": queue, 

622 "message": { 

623 "operation_id": "op", 

624 "operation_type": "subscribe", 

625 "message": {"data": "hello"}, 

626 }, 

627 "options": {"sendToRealBroker": False}, 

628 }, 

629 ) 

630 

631 assert response.status_code == 500 

632 assert response.json() == IsPartialDict(details=Contains("TimeoutError")) 

633 

634 @pytest.mark.asyncio() 

635 async def test_try_it_out_spec_endpoint_base_overrides_route_default(self) -> None: 

636 broker = self.get_broker() 

637 

638 app = AsgiFastStream( 

639 broker, 

640 asyncapi_path=AsyncAPIRoute( 

641 "/docs", 

642 try_it_out_url="https://api.example.com/try", 

643 ), 

644 ) 

645 

646 async with self.get_test_broker(broker): 

647 with TestClient(app) as client: 

648 response = client.get("/docs") 

649 assert response.status_code == 200 

650 assert response.text == Contains("https://api.example.com/try")