Coverage for faststream / asgi / request.py: 76%
54 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import json
2from collections.abc import AsyncGenerator
3from typing import TYPE_CHECKING, Any, cast
4from urllib.parse import parse_qs
6from faststream._internal.constants import EMPTY
8if TYPE_CHECKING:
9 from .types import Receive, Scope, Send
12class ClientDisconnectError(Exception): ...
15class AsgiRequest:
16 def __init__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
17 self._scope = scope
18 self._receive = receive
19 self._send = send
20 self._stream_consumed = False
21 self._body: bytes | None = None
22 self._json: Any = EMPTY
23 self._query_params: dict[str, list[str]] | None = None
24 self._headers: dict[str, str] | None = None
26 async def stream(self) -> AsyncGenerator[bytes, None]:
27 if self._body is not None: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true
28 yield self._body
29 yield b""
30 return
31 if self._stream_consumed: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true
32 msg = "Stream consumed"
33 raise RuntimeError(msg)
34 while not self._stream_consumed:
35 message = await self._receive()
36 if message["type"] == "http.request": 36 ↛ 42line 36 didn't jump to line 42 because the condition on line 36 was always true
37 body = message.get("body", b"")
38 if not message.get("more_body", False): 38 ↛ 40line 38 didn't jump to line 40 because the condition on line 38 was always true
39 self._stream_consumed = True
40 if body: 40 ↛ 34line 40 didn't jump to line 34 because the condition on line 40 was always true
41 yield body
42 elif message["type"] == "http.disconnect": # pragma: no branch
43 self._is_disconnected = True
44 raise ClientDisconnectError
45 yield b""
47 async def body(self) -> bytes:
48 if self._body is None: 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true
49 chunks: list[bytes] = [chunk async for chunk in self.stream()]
50 self._body = b"".join(chunks)
51 return self._body
53 async def json(self) -> Any:
54 if self._json is EMPTY: 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true
55 body = await self.body()
56 self._json = json.loads(body)
57 return self._json
59 @property
60 def query_params(self) -> dict[str, list[str]]:
61 if self._query_params is None: 61 ↛ 67line 61 didn't jump to line 67 because the condition on line 61 was always true
62 query_string = self._scope.get("query_string", b"")
63 self._query_params = {
64 key.decode("latin-1"): [v.decode("latin-1") for v in value]
65 for key, value in parse_qs(query_string).items()
66 }
67 return self._query_params
69 @property
70 def headers(self) -> dict[str, str]:
71 if self._headers is None: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true
72 self._headers = {
73 key.lower().decode("latin-1"): value.decode("latin-1")
74 for key, value in list(self._scope["headers"])
75 }
76 return self._headers
78 @property
79 def method(self) -> str:
80 return cast("str", self._scope["method"])