Coverage for faststream / _internal / parser.py: 91%

11 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Sequence 

3from typing import TYPE_CHECKING, Any, Protocol, TypeVar 

4 

5from faststream.message.utils import decode_message, encode_message 

6 

7if TYPE_CHECKING: 

8 from fast_depends.library.serializer import SerializerProto 

9 

10 from faststream._internal.basic_types import DecodedMessage, SendableMessage 

11 from faststream.message import StreamMessage 

12 

13MsgType = TypeVar("MsgType") 

14 

15 

16class ParserProto(Protocol[MsgType]): 

17 """Protocol for parsing raw messages into StreamMessage.""" 

18 

19 @abstractmethod 

20 async def parse_message(self, message: MsgType) -> "StreamMessage[MsgType]": 

21 """Parse a raw message into a StreamMessage.""" 

22 ... 

23 

24 

25class DecoderProto(Protocol): 

26 """Protocol for decoding StreamMessage into DecodedMessage.""" 

27 

28 @abstractmethod 

29 async def decode_message(self, msg: "StreamMessage[Any]") -> "DecodedMessage": 

30 """Decode a StreamMessage into a DecodedMessage.""" 

31 ... 

32 

33 

34class BatchParserProto(Protocol[MsgType]): 

35 """Protocol for parsing a batch of raw messages into StreamMessage.""" 

36 

37 @abstractmethod 

38 async def parse_batch( 

39 self, messages: Sequence[MsgType] 

40 ) -> "StreamMessage[Sequence[MsgType]]": 

41 """Parse a batch of raw messages into a StreamMessage.""" 

42 ... 

43 

44 

45class BatchDecoderProto(Protocol[MsgType]): 

46 """Protocol for decoding a batch of StreamMessage into list of DecodedMessage.""" 

47 

48 @abstractmethod 

49 async def decode_batch( 

50 self, msg: "StreamMessage[Sequence[MsgType]]" 

51 ) -> list["DecodedMessage"]: 

52 """Decode a batch of StreamMessage into a list of DecodedMessage.""" 

53 ... 

54 

55 

56class CodecProto(Protocol): 

57 """Protocol for encoding and decoding message bodies.""" 

58 

59 @abstractmethod 

60 async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": 

61 """Decode a StreamMessage body into a Python object.""" 

62 ... 

63 

64 @abstractmethod 

65 async def encode( 

66 self, 

67 msg: "SendableMessage", 

68 serializer: "SerializerProto | None" = None, 

69 ) -> tuple[bytes, str | None]: 

70 """Encode a Python object into bytes and content_type.""" 

71 ... 

72 

73 

74class DefaultCodec: 

75 """Default codec that delegates to the shared encode_message/decode_message functions.""" 

76 

77 async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": 

78 return decode_message(msg) 

79 

80 async def encode( 

81 self, 

82 msg: "SendableMessage", 

83 serializer: "SerializerProto | None" = None, 

84 ) -> tuple[bytes, str | None]: 

85 return encode_message(msg, serializer)