Coverage for faststream / asgi / request.py: 76%

54 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import json 

2from collections.abc import AsyncGenerator 

3from typing import TYPE_CHECKING, Any, cast 

4from urllib.parse import parse_qs 

5 

6from faststream._internal.constants import EMPTY 

7 

8if TYPE_CHECKING: 

9 from .types import Receive, Scope, Send 

10 

11 

12class ClientDisconnectError(Exception): ... 

13 

14 

15class AsgiRequest: 

16 def __init__(self, scope: "Scope", receive: "Receive", send: "Send") -> None: 

17 self._scope = scope 

18 self._receive = receive 

19 self._send = send 

20 self._stream_consumed = False 

21 self._body: bytes | None = None 

22 self._json: Any = EMPTY 

23 self._query_params: dict[str, list[str]] | None = None 

24 self._headers: dict[str, str] | None = None 

25 

26 async def stream(self) -> AsyncGenerator[bytes, None]: 

27 if self._body is not None: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true

28 yield self._body 

29 yield b"" 

30 return 

31 if self._stream_consumed: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true

32 msg = "Stream consumed" 

33 raise RuntimeError(msg) 

34 while not self._stream_consumed: 

35 message = await self._receive() 

36 if message["type"] == "http.request": 36 ↛ 42line 36 didn't jump to line 42 because the condition on line 36 was always true

37 body = message.get("body", b"") 

38 if not message.get("more_body", False): 38 ↛ 40line 38 didn't jump to line 40 because the condition on line 38 was always true

39 self._stream_consumed = True 

40 if body: 40 ↛ 34line 40 didn't jump to line 34 because the condition on line 40 was always true

41 yield body 

42 elif message["type"] == "http.disconnect": # pragma: no branch 

43 self._is_disconnected = True 

44 raise ClientDisconnectError 

45 yield b"" 

46 

47 async def body(self) -> bytes: 

48 if self._body is None: 48 ↛ 51line 48 didn't jump to line 51 because the condition on line 48 was always true

49 chunks: list[bytes] = [chunk async for chunk in self.stream()] 

50 self._body = b"".join(chunks) 

51 return self._body 

52 

53 async def json(self) -> Any: 

54 if self._json is EMPTY: 54 ↛ 57line 54 didn't jump to line 57 because the condition on line 54 was always true

55 body = await self.body() 

56 self._json = json.loads(body) 

57 return self._json 

58 

59 @property 

60 def query_params(self) -> dict[str, list[str]]: 

61 if self._query_params is None: 61 ↛ 67line 61 didn't jump to line 67 because the condition on line 61 was always true

62 query_string = self._scope.get("query_string", b"") 

63 self._query_params = { 

64 key.decode("latin-1"): [v.decode("latin-1") for v in value] 

65 for key, value in parse_qs(query_string).items() 

66 } 

67 return self._query_params 

68 

69 @property 

70 def headers(self) -> dict[str, str]: 

71 if self._headers is None: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true

72 self._headers = { 

73 key.lower().decode("latin-1"): value.decode("latin-1") 

74 for key, value in list(self._scope["headers"]) 

75 } 

76 return self._headers 

77 

78 @property 

79 def method(self) -> str: 

80 return cast("str", self._scope["method"])