Coverage for tests / asgi / testcase.py: 99%
238 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import math
3from collections.abc import Callable
4from typing import Any
5from unittest.mock import AsyncMock, MagicMock
7import pytest
8from dirty_equals import Contains, IsFloat, IsList, IsPartialDict, IsStr
9from fast_depends import Depends
10from freezegun import freeze_time
11from starlette.applications import Starlette
12from starlette.routing import Mount
13from starlette.testclient import TestClient
14from starlette.websockets import WebSocketDisconnect
16from faststream._internal.context import Context
17from faststream.annotations import FastStream, Logger
18from faststream.asgi import (
19 AsgiFastStream,
20 AsgiResponse,
21 AsyncAPIRoute,
22 Request,
23 get,
24 make_asyncapi_asgi,
25 make_ping_asgi,
26 post,
27)
28from faststream.asgi.params import Header, Query
29from faststream.asgi.types import ASGIApp, Scope
30from faststream.specification import AsyncAPI
33class AsgiTestcase:
34 def get_broker(self) -> Any:
35 raise NotImplementedError
37 def get_test_broker(self, broker: Any) -> Any:
38 raise NotImplementedError
40 @pytest.mark.asyncio()
41 async def test_not_found(self) -> None:
42 broker = self.get_broker()
43 app = AsgiFastStream(broker)
45 async with self.get_test_broker(broker):
46 with TestClient(app) as client:
47 response = client.get("/")
48 assert response.status_code == 404
50 @pytest.mark.asyncio()
51 async def test_ws_not_found(self) -> None:
52 broker = self.get_broker()
54 app = AsgiFastStream(broker)
56 async with self.get_test_broker(broker):
57 with TestClient(app) as client:
58 with pytest.raises(WebSocketDisconnect):
59 with client.websocket_connect("/ws"): # raises error
60 pass
62 @pytest.mark.asyncio()
63 async def test_asgi_ping_healthy(self) -> None:
64 broker = self.get_broker()
66 app = AsgiFastStream(
67 broker,
68 asgi_routes=[("/health", make_ping_asgi(broker, timeout=5.0))],
69 )
71 async with self.get_test_broker(broker):
72 with TestClient(app) as client:
73 response = client.get("/health")
74 assert response.status_code == 204
76 @pytest.mark.asyncio()
77 async def test_asgi_ping_unhealthy(self) -> None:
78 broker = self.get_broker()
80 app = AsgiFastStream(
81 broker,
82 asgi_routes=[
83 ("/health", make_ping_asgi(broker, timeout=5.0)),
84 ],
85 )
86 async with self.get_test_broker(broker) as br:
87 br.ping = AsyncMock()
88 br.ping.return_value = False
90 with TestClient(app) as client:
91 response = client.get("/health")
92 assert response.status_code == 500
94 @pytest.mark.asyncio()
95 async def test_asyncapi_asgi(self) -> None:
96 broker = self.get_broker()
98 app = AsgiFastStream(
99 broker,
100 specification=AsyncAPI(),
101 asyncapi_path="/docs",
102 )
104 async with self.get_test_broker(broker):
105 with TestClient(app) as client:
106 response = client.get("/docs")
107 assert response.status_code == 200, response
108 assert response.text
110 @pytest.mark.asyncio()
111 async def test_asyncapi_asgi_if_broker_set_by_method(self) -> None:
112 broker = self.get_broker()
114 app = AsgiFastStream(
115 specification=AsyncAPI(),
116 asyncapi_path="/docs",
117 )
119 app.set_broker(broker)
121 async with self.get_test_broker(broker):
122 with TestClient(app) as client:
123 response = client.get("/docs")
124 assert response.status_code == 200, response
125 assert response.text
127 @pytest.mark.asyncio()
128 @pytest.mark.parametrize(
129 ("decorator", "client_method"),
130 (
131 pytest.param(get, "get", id="get"),
132 pytest.param(post, "post", id="post"),
133 ),
134 )
135 async def test_decorators(
136 self, decorator: Callable[..., ASGIApp], client_method: str
137 ) -> None:
138 @decorator
139 async def some_handler(scope: Scope) -> AsgiResponse:
140 return AsgiResponse(body=b"test", status_code=200)
142 broker = self.get_broker()
143 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)])
145 async with self.get_test_broker(broker):
146 with TestClient(app) as client:
147 response = getattr(client, client_method)("/test")
148 assert response.status_code == 200
149 assert response.text == "test"
151 @pytest.mark.asyncio()
152 @pytest.mark.parametrize(
153 ("decorator", "client_method"),
154 (
155 pytest.param(get, "get", id="get"),
156 pytest.param(post, "post", id="post"),
157 ),
158 )
159 async def test_context_injected(
160 self, decorator: Callable[..., ASGIApp], client_method: str
161 ) -> None:
162 @decorator
163 async def some_handler(
164 request: Request, logger: Logger, app: FastStream
165 ) -> AsgiResponse:
166 return AsgiResponse(
167 body=f"{request.__class__.__name__} {logger.__class__.__name__} {app.__class__.__name__}".encode(),
168 status_code=200,
169 )
171 broker = self.get_broker()
172 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)])
174 async with self.get_test_broker(broker):
175 with TestClient(app) as client:
176 response = getattr(client, client_method)("/test")
177 assert response.status_code == 200
178 assert response.text == "AsgiRequest Logger AsgiFastStream"
180 @pytest.mark.asyncio()
181 @pytest.mark.parametrize(
182 ("decorator", "client_method"),
183 (
184 pytest.param(get, "get", id="get"),
185 pytest.param(post, "post", id="post"),
186 ),
187 )
188 async def test_fast_depends_injected(
189 self, decorator: Callable[..., ASGIApp], client_method: str
190 ) -> None:
191 def get_string() -> str:
192 return "test"
194 @decorator
195 async def some_handler(string=Depends(get_string)) -> AsgiResponse: # noqa: B008
196 return AsgiResponse(body=string.encode(), status_code=200)
198 broker = self.get_broker()
199 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)])
201 async with self.get_test_broker(broker):
202 with TestClient(app) as client:
203 response = getattr(client, client_method)("/test")
204 assert response.status_code == 200
205 assert response.text == "test"
207 @pytest.mark.asyncio()
208 @pytest.mark.parametrize(
209 "dependency",
210 (
211 pytest.param(Query(), id="query"),
212 pytest.param(Header(), id="header"),
213 ),
214 )
215 async def test_validation_error_handled(self, dependency: Context) -> None:
216 @get
217 async def some_handler(dep=dependency) -> AsgiResponse:
218 return AsgiResponse(status_code=200)
220 broker = self.get_broker()
221 app = AsgiFastStream(broker, asgi_routes=[("/test", some_handler)])
223 async with self.get_test_broker(broker):
224 with TestClient(app) as client:
225 response = client.get("/test")
226 assert response.status_code == 422
227 assert response.text == "Validation error"
229 def test_asyncapi_pure_asgi(self) -> None:
230 broker = self.get_broker()
232 app = Starlette(routes=[Mount("/", make_asyncapi_asgi(AsyncAPI(broker)))])
234 with TestClient(app) as client:
235 response = client.get("/")
236 assert response.status_code == 200
237 assert response.text == Contains("<!DOCTYPE html>")
239 # ===== TryItOut tests =====
240 @pytest.mark.asyncio()
241 async def test_try_it_out_message_delivered_to_subscriber(
242 self, queue: str, mock: MagicMock
243 ) -> None:
244 broker = self.get_broker()
246 @broker.subscriber(queue)
247 async def handler(msg: Any) -> None:
248 mock(msg)
250 app = AsgiFastStream(
251 broker,
252 asyncapi_path=AsyncAPIRoute("/asyncapi", try_it_out=True),
253 )
255 async with self.get_test_broker(broker):
256 with TestClient(app) as client:
257 client.post(
258 "/asyncapi/try",
259 json={
260 "channelName": queue,
261 "message": {
262 "operation_id": "op",
263 "operation_type": "subscribe",
264 "message": {"text": "hello"},
265 },
266 "options": {"sendToRealBroker": False},
267 },
268 )
270 mock.assert_called_once_with(IsPartialDict(text="hello"))
272 @pytest.mark.asyncio()
273 async def test_try_it_out_string_payload_delivered(
274 self, queue: str, mock: MagicMock
275 ) -> None:
276 """Plugin wraps primitive payloads in message.message — ensure they arrive correctly."""
277 broker = self.get_broker()
279 @broker.subscriber(queue)
280 async def handler(msg: Any) -> None:
281 mock(msg)
283 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
285 async with self.get_test_broker(broker):
286 with TestClient(app) as client:
287 client.post(
288 "/asyncapi/try",
289 json={
290 "channelName": queue,
291 "message": {
292 "operation_id": "op",
293 "operation_type": "subscribe",
294 "message": "hello",
295 },
296 "options": {"sendToRealBroker": False},
297 },
298 )
300 mock.assert_called_once_with("hello")
302 @pytest.mark.asyncio()
303 async def test_try_it_out_integer_payload_delivered(
304 self, queue: str, mock: MagicMock
305 ) -> None:
306 """Primitive int payload should arrive correctly."""
307 broker = self.get_broker()
309 @broker.subscriber(queue)
310 async def handler(msg: int) -> None:
311 mock(msg)
313 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
315 async with self.get_test_broker(broker):
316 with TestClient(app) as client:
317 client.post(
318 "/asyncapi/try",
319 json={
320 "channelName": queue,
321 "message": {
322 "operation_id": "op",
323 "operation_type": "subscribe",
324 "message": 42,
325 },
326 "options": {"sendToRealBroker": False},
327 },
328 )
330 mock.assert_called_once_with(42)
332 @pytest.mark.asyncio()
333 async def test_try_it_out_float_payload_delivered(
334 self, queue: str, mock: MagicMock
335 ) -> None:
336 """Primitive float payload should arrive correctly."""
337 broker = self.get_broker()
339 @broker.subscriber(queue)
340 async def handler(msg: float) -> None:
341 mock(msg)
343 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
345 async with self.get_test_broker(broker):
346 with TestClient(app) as client:
347 client.post(
348 "/asyncapi/try",
349 json={
350 "channelName": queue,
351 "message": {
352 "operation_id": "op",
353 "operation_type": "subscribe",
354 "message": math.pi,
355 },
356 "options": {"sendToRealBroker": False},
357 },
358 )
360 mock.assert_called_once_with(IsFloat(approx=math.pi))
362 @pytest.mark.asyncio()
363 async def test_try_it_out_boolean_payload_delivered(
364 self, queue: str, mock: MagicMock
365 ) -> None:
366 """Primitive boolean payload should arrive correctly."""
367 broker = self.get_broker()
369 @broker.subscriber(queue)
370 async def handler(msg: bool) -> None:
371 mock(msg)
373 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
375 async with self.get_test_broker(broker):
376 with TestClient(app) as client:
377 client.post(
378 "/asyncapi/try",
379 json={
380 "channelName": queue,
381 "message": {
382 "operation_id": "op",
383 "operation_type": "subscribe",
384 "message": True,
385 },
386 "options": {"sendToRealBroker": False},
387 },
388 )
390 mock.assert_called_once_with(True)
392 @pytest.mark.asyncio()
393 async def test_try_it_out_array_payload_delivered(
394 self, queue: str, mock: MagicMock
395 ) -> None:
396 """Array payload should arrive correctly."""
397 broker = self.get_broker()
399 @broker.subscriber(queue)
400 async def handler(msg: list[Any]) -> None:
401 mock(msg)
403 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
405 async with self.get_test_broker(broker):
406 with TestClient(app) as client:
407 client.post(
408 "/asyncapi/try",
409 json={
410 "channelName": queue,
411 "message": {
412 "operation_id": "op",
413 "operation_type": "subscribe",
414 "message": ["one", "two", "three"],
415 },
416 "options": {"sendToRealBroker": False},
417 },
418 )
420 mock.assert_called_once_with(IsList("one", "two", "three"))
422 @pytest.mark.asyncio()
423 async def test_try_it_out_object_payload_delivered(
424 self, queue: str, mock: MagicMock
425 ) -> None:
426 """Object (dict) payload should arrive correctly."""
427 broker = self.get_broker()
429 @broker.subscriber(queue)
430 async def handler(msg: dict[str, Any]) -> None:
431 mock(msg)
433 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
435 async with self.get_test_broker(broker):
436 with TestClient(app) as client:
437 client.post(
438 "/asyncapi/try",
439 json={
440 "channelName": queue,
441 "message": {
442 "operation_id": "op",
443 "operation_type": "subscribe",
444 "message": {"field": "value", "count": 42, "mock": True},
445 },
446 "options": {"sendToRealBroker": False},
447 },
448 )
450 mock.assert_called_once_with(IsPartialDict(field="value", count=42, mock=True))
452 @pytest.mark.asyncio()
453 async def test_try_it_out_memory_subsricber_returns_result(self, queue: str) -> None:
454 broker = self.get_broker()
456 @broker.subscriber(queue)
457 async def handler(msg: dict[str, Any]) -> dict[str, Any]:
458 return {"result": 1}
460 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
462 async with self.get_test_broker(broker):
463 with TestClient(app) as client:
464 response = client.post(
465 "/asyncapi/try",
466 json={
467 "channelName": queue,
468 "message": {
469 "operation_id": "op",
470 "operation_type": "subscribe",
471 "message": {"data": "hello"},
472 },
473 "options": {"sendToRealBroker": False},
474 },
475 )
476 assert response.status_code == 200, response.json()
477 assert response.json() == IsPartialDict(result=1)
479 @pytest.mark.asyncio()
480 async def test_try_it_out_disabled(self, queue: str) -> None:
481 broker = self.get_broker()
483 app = AsgiFastStream(
484 broker,
485 asyncapi_path=AsyncAPIRoute("/asyncapi", try_it_out=False),
486 )
488 async with self.get_test_broker(broker):
489 with TestClient(app) as client:
490 r = client.post(
491 "/asyncapi/try",
492 json={
493 "channelName": queue,
494 "message": {
495 "operation_id": "op",
496 "operation_type": "subscribe",
497 "message": {"text": "hello"},
498 },
499 "options": {"sendToRealBroker": False},
500 },
501 )
502 assert r.status_code == 404
504 @pytest.mark.asyncio()
505 async def test_try_it_out_missing_channel_returns_400(self) -> None:
506 broker = self.get_broker()
508 app = AsgiFastStream(broker, asyncapi_path="/docs")
510 async with self.get_test_broker(broker):
511 with TestClient(app) as client:
512 response = client.post("/docs/try", json={"message": {}})
513 assert response.status_code == 400
514 assert response.json() == IsPartialDict(details="Missing channelName")
516 @pytest.mark.asyncio()
517 async def test_try_it_out_channel_not_found(self, queue: str) -> None:
518 broker = self.get_broker()
520 app = AsgiFastStream(broker, asyncapi_path="/docs")
522 async with self.get_test_broker(broker):
523 with TestClient(app) as client:
524 response = client.post(
525 "/docs/try",
526 json={
527 "channelName": queue,
528 "message": {
529 "operation_id": "op",
530 "operation_type": "subscribe",
531 "message": {"text": "hello"},
532 },
533 "options": {"sendToRealBroker": False},
534 },
535 )
536 assert response.status_code == 404, response.status_code
537 assert response.json() == IsPartialDict(
538 details=IsStr(regex=r".+ destination not found\.")
539 )
541 @pytest.mark.asyncio()
542 async def test_try_it_out_path_follows_asyncapi_path(self, queue: str) -> None:
543 broker = self.get_broker()
545 @broker.subscriber(queue)
546 async def handler(msg: dict[str, Any]) -> None:
547 pass
549 app = AsgiFastStream(broker, asyncapi_path="/custom/docs")
551 async with self.get_test_broker(broker):
552 with TestClient(app) as client:
553 response = client.post(
554 "/custom/docs/try",
555 json={
556 "channelName": queue,
557 "message": {
558 "operation_id": "op",
559 "operation_type": "subscribe",
560 "message": {},
561 },
562 "options": {"sendToRealBroker": False},
563 },
564 )
566 assert response.status_code == 200
567 assert response.json() == "ok"
569 @pytest.mark.asyncio()
570 @freeze_time(auto_tick_seconds=5)
571 async def test_try_it_out_subscriber_completes_within_timeout(
572 self, queue: str
573 ) -> None:
574 """Subscriber that finishes before the configured timeout should return a 200 with its result."""
575 broker = self.get_broker()
577 @broker.subscriber(queue)
578 async def handler(msg: Any) -> dict[str, Any]:
579 await asyncio.sleep(5)
580 return {"result": "done"}
582 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
584 async with self.get_test_broker(broker):
585 with TestClient(app) as client:
586 response = client.post(
587 "/asyncapi/try",
588 json={
589 "channelName": queue,
590 "message": {
591 "operation_id": "op",
592 "operation_type": "subscribe",
593 "message": {"data": "hello"},
594 },
595 "options": {"sendToRealBroker": False},
596 },
597 )
599 assert response.status_code == 200, response.json()
600 assert response.json() == IsPartialDict(result="done")
602 @pytest.mark.asyncio()
603 @freeze_time(auto_tick_seconds=30)
604 async def test_try_it_out_subscriber_exceeds_timeout_returns_500(
605 self, queue: str
606 ) -> None:
607 """Subscriber that runs longer than the configured timeout should produce a 500 carrying the timeout exception."""
608 broker = self.get_broker()
610 @broker.subscriber(queue)
611 async def handler(msg: Any) -> None:
612 await asyncio.sleep(30)
614 app = AsgiFastStream(broker, asyncapi_path="/asyncapi")
616 async with self.get_test_broker(broker):
617 with TestClient(app) as client:
618 response = client.post(
619 "/asyncapi/try",
620 json={
621 "channelName": queue,
622 "message": {
623 "operation_id": "op",
624 "operation_type": "subscribe",
625 "message": {"data": "hello"},
626 },
627 "options": {"sendToRealBroker": False},
628 },
629 )
631 assert response.status_code == 500
632 assert response.json() == IsPartialDict(details=Contains("TimeoutError"))
634 @pytest.mark.asyncio()
635 async def test_try_it_out_spec_endpoint_base_overrides_route_default(self) -> None:
636 broker = self.get_broker()
638 app = AsgiFastStream(
639 broker,
640 asyncapi_path=AsyncAPIRoute(
641 "/docs",
642 try_it_out_url="https://api.example.com/try",
643 ),
644 )
646 async with self.get_test_broker(broker):
647 with TestClient(app) as client:
648 response = client.get("/docs")
649 assert response.status_code == 200
650 assert response.text == Contains("https://api.example.com/try")