Coverage for faststream / _internal / parser.py: 91%
11 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from collections.abc import Sequence
3from typing import TYPE_CHECKING, Any, Protocol, TypeVar
5from faststream.message.utils import decode_message, encode_message
7if TYPE_CHECKING:
8 from fast_depends.library.serializer import SerializerProto
10 from faststream._internal.basic_types import DecodedMessage, SendableMessage
11 from faststream.message import StreamMessage
13MsgType = TypeVar("MsgType")
16class ParserProto(Protocol[MsgType]):
17 """Protocol for parsing raw messages into StreamMessage."""
19 @abstractmethod
20 async def parse_message(self, message: MsgType) -> "StreamMessage[MsgType]":
21 """Parse a raw message into a StreamMessage."""
22 ...
25class DecoderProto(Protocol):
26 """Protocol for decoding StreamMessage into DecodedMessage."""
28 @abstractmethod
29 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
30 """Decode a StreamMessage into a DecodedMessage."""
31 ...
34class BatchParserProto(Protocol[MsgType]):
35 """Protocol for parsing a batch of raw messages into StreamMessage."""
37 @abstractmethod
38 async def parse_batch(
39 self, messages: Sequence[MsgType]
40 ) -> "StreamMessage[Sequence[MsgType]]":
41 """Parse a batch of raw messages into a StreamMessage."""
42 ...
45class BatchDecoderProto(Protocol[MsgType]):
46 """Protocol for decoding a batch of StreamMessage into list of DecodedMessage."""
48 @abstractmethod
49 async def decode_batch(
50 self, msg: "StreamMessage[Sequence[MsgType]]"
51 ) -> list["DecodedMessage"]:
52 """Decode a batch of StreamMessage into a list of DecodedMessage."""
53 ...
56class CodecProto(Protocol):
57 """Protocol for encoding and decoding message bodies."""
59 @abstractmethod
60 async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
61 """Decode a StreamMessage body into a Python object."""
62 ...
64 @abstractmethod
65 async def encode(
66 self,
67 msg: "SendableMessage",
68 serializer: "SerializerProto | None" = None,
69 ) -> tuple[bytes, str | None]:
70 """Encode a Python object into bytes and content_type."""
71 ...
74class DefaultCodec:
75 """Default codec that delegates to the shared encode_message/decode_message functions."""
77 async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage":
78 return decode_message(msg)
80 async def encode(
81 self,
82 msg: "SendableMessage",
83 serializer: "SerializerProto | None" = None,
84 ) -> tuple[bytes, str | None]:
85 return encode_message(msg, serializer)