Coverage for faststream / message / message.py: 96%

40 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from enum import Enum 

2from typing import ( 

3 TYPE_CHECKING, 

4 Any, 

5 Generic, 

6 Optional, 

7 TypeVar, 

8) 

9from uuid import uuid4 

10 

11from .source_type import SourceType 

12 

13if TYPE_CHECKING: 

14 from faststream._internal.types import AsyncCallable 

15 

16# prevent circular imports 

17MsgType = TypeVar("MsgType") 

18 

19_NOT_CACHED = object() 

20 

21 

22class AckStatus(str, Enum): 

23 ACKED = "ACKED" 

24 NACKED = "NACKED" 

25 REJECTED = "REJECTED" 

26 

27 

28class StreamMessage(Generic[MsgType]): 

29 """Generic class to represent a stream message.""" 

30 

31 def __init__( 

32 self, 

33 raw_message: "MsgType", 

34 body: bytes | Any, 

35 *, 

36 headers: dict[str, Any] | None = None, 

37 reply_to: str = "", 

38 batch_headers: list[dict[str, Any]] | None = None, 

39 path: dict[str, Any] | None = None, 

40 content_type: str | None = None, 

41 correlation_id: str | None = None, 

42 message_id: str | None = None, 

43 source_type: SourceType = SourceType.CONSUME, 

44 ) -> None: 

45 self.raw_message = raw_message 

46 self.body = body 

47 self.reply_to = reply_to 

48 self.content_type = content_type 

49 self.source_type = source_type 

50 

51 self.headers = headers or {} 

52 self.batch_headers = batch_headers or [] 

53 self.path = path or {} 

54 self.correlation_id = correlation_id or str(uuid4()) 

55 self.message_id = message_id or self.correlation_id 

56 

57 self.committed: AckStatus | None = None 

58 self.processed = False 

59 

60 # Setup later 

61 self.__decoder: AsyncCallable | None = None 

62 self.__decoded_caches: dict[ 

63 Any, 

64 Any, 

65 ] = {} # Cache values between filters and tests 

66 

67 def set_decoder(self, decoder: "AsyncCallable") -> None: 

68 self.__decoder = decoder 

69 

70 def clear_cache(self) -> None: 

71 self.__decoded_caches.clear() 

72 

73 def __repr__(self) -> str: 

74 inner = ", ".join( 

75 filter( 

76 bool, 

77 ( 

78 f"body={self.body!r}", 

79 f"content_type={self.content_type}", 

80 f"message_id={self.message_id}", 

81 f"correlation_id={self.correlation_id}", 

82 f"reply_to={self.reply_to}" if self.reply_to else "", 

83 f"headers={self.headers}", 

84 f"path={self.path}", 

85 f"committed={self.committed}", 

86 f"raw_message={self.raw_message}", 

87 ), 

88 ), 

89 ) 

90 

91 return f"{self.__class__.__name__}({inner})" 

92 

93 async def decode(self) -> Optional["Any"]: 

94 """Serialize the message by lazy decoder. 

95 

96 Returns a cache after first usage. To prevent such behavior, please call 

97 `message.clear_cache()` after `message.body` changes. 

98 """ 

99 assert self.__decoder, "You should call `set_decoder()` method first." 

100 

101 if ( 

102 result := self.__decoded_caches.get(self.__decoder, _NOT_CACHED) 

103 ) is _NOT_CACHED: 

104 result = self.__decoded_caches[self.__decoder] = await self.__decoder(self) 

105 

106 return result 

107 

108 async def ack(self) -> None: 

109 if self.committed is None: 

110 self.committed = AckStatus.ACKED 

111 

112 async def nack(self) -> None: 

113 if self.committed is None: 113 ↛ exitline 113 didn't return from function 'nack' because the condition on line 113 was always true

114 self.committed = AckStatus.NACKED 

115 

116 async def reject(self) -> None: 

117 if self.committed is None: 

118 self.committed = AckStatus.REJECTED