Coverage for faststream / message / message.py: 96%
40 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from enum import Enum
2from typing import (
3 TYPE_CHECKING,
4 Any,
5 Generic,
6 Optional,
7 TypeVar,
8)
9from uuid import uuid4
11from .source_type import SourceType
13if TYPE_CHECKING:
14 from faststream._internal.types import AsyncCallable
16# prevent circular imports
17MsgType = TypeVar("MsgType")
19_NOT_CACHED = object()
22class AckStatus(str, Enum):
23 ACKED = "ACKED"
24 NACKED = "NACKED"
25 REJECTED = "REJECTED"
28class StreamMessage(Generic[MsgType]):
29 """Generic class to represent a stream message."""
31 def __init__(
32 self,
33 raw_message: "MsgType",
34 body: bytes | Any,
35 *,
36 headers: dict[str, Any] | None = None,
37 reply_to: str = "",
38 batch_headers: list[dict[str, Any]] | None = None,
39 path: dict[str, Any] | None = None,
40 content_type: str | None = None,
41 correlation_id: str | None = None,
42 message_id: str | None = None,
43 source_type: SourceType = SourceType.CONSUME,
44 ) -> None:
45 self.raw_message = raw_message
46 self.body = body
47 self.reply_to = reply_to
48 self.content_type = content_type
49 self.source_type = source_type
51 self.headers = headers or {}
52 self.batch_headers = batch_headers or []
53 self.path = path or {}
54 self.correlation_id = correlation_id or str(uuid4())
55 self.message_id = message_id or self.correlation_id
57 self.committed: AckStatus | None = None
58 self.processed = False
60 # Setup later
61 self.__decoder: AsyncCallable | None = None
62 self.__decoded_caches: dict[
63 Any,
64 Any,
65 ] = {} # Cache values between filters and tests
67 def set_decoder(self, decoder: "AsyncCallable") -> None:
68 self.__decoder = decoder
70 def clear_cache(self) -> None:
71 self.__decoded_caches.clear()
73 def __repr__(self) -> str:
74 inner = ", ".join(
75 filter(
76 bool,
77 (
78 f"body={self.body!r}",
79 f"content_type={self.content_type}",
80 f"message_id={self.message_id}",
81 f"correlation_id={self.correlation_id}",
82 f"reply_to={self.reply_to}" if self.reply_to else "",
83 f"headers={self.headers}",
84 f"path={self.path}",
85 f"committed={self.committed}",
86 f"raw_message={self.raw_message}",
87 ),
88 ),
89 )
91 return f"{self.__class__.__name__}({inner})"
93 async def decode(self) -> Optional["Any"]:
94 """Serialize the message by lazy decoder.
96 Returns a cache after first usage. To prevent such behavior, please call
97 `message.clear_cache()` after `message.body` changes.
98 """
99 assert self.__decoder, "You should call `set_decoder()` method first."
101 if (
102 result := self.__decoded_caches.get(self.__decoder, _NOT_CACHED)
103 ) is _NOT_CACHED:
104 result = self.__decoded_caches[self.__decoder] = await self.__decoder(self)
106 return result
108 async def ack(self) -> None:
109 if self.committed is None:
110 self.committed = AckStatus.ACKED
112 async def nack(self) -> None:
113 if self.committed is None: 113 ↛ exitline 113 didn't return from function 'nack' because the condition on line 113 was always true
114 self.committed = AckStatus.NACKED
116 async def reject(self) -> None:
117 if self.committed is None:
118 self.committed = AckStatus.REJECTED