Coverage for faststream / nats / message.py: 62%
38 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from nats.aio.msg import Msg
2from nats.js.api import ObjectInfo
3from nats.js.kv import KeyValue
5from faststream.message import StreamMessage
8class NatsMessage(StreamMessage[Msg]):
9 """A class to represent a NATS message."""
11 async def ack(self) -> None:
12 # Check `self.raw_message._ackd` instead of `self.committed`
13 # to be compatible with `self.raw_message.ack()`
14 if not self.raw_message._ackd:
15 await self.raw_message.ack()
16 await super().ack()
18 async def ack_sync(self) -> None:
19 if not self.raw_message._ackd: 19 ↛ 21line 19 didn't jump to line 21 because the condition on line 19 was always true
20 await self.raw_message.ack_sync()
21 await super().ack()
23 async def nack(
24 self,
25 delay: float | None = None,
26 ) -> None:
27 if not self.raw_message._ackd:
28 await self.raw_message.nak(delay=delay)
29 await super().nack()
31 async def reject(self) -> None:
32 if not self.raw_message._ackd: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true
33 await self.raw_message.term()
34 await super().reject()
36 async def in_progress(self) -> None:
37 if not self.raw_message._ackd:
38 await self.raw_message.in_progress()
41class NatsBatchMessage(StreamMessage[list[Msg]]):
42 """A class to represent a NATS batch message."""
44 async def ack(self) -> None:
45 for m in filter(
46 lambda m: not m._ackd,
47 self.raw_message,
48 ):
49 await m.ack()
51 await super().ack()
53 async def nack(
54 self,
55 delay: float | None = None,
56 ) -> None:
57 for m in filter(
58 lambda m: not m._ackd,
59 self.raw_message,
60 ):
61 await m.nak(delay=delay)
63 await super().nack()
65 async def reject(self) -> None:
66 for m in filter(
67 lambda m: not m._ackd,
68 self.raw_message,
69 ):
70 await m.term()
72 await super().reject()
74 async def in_progress(self) -> None:
75 for m in filter(
76 lambda m: not m._ackd,
77 self.raw_message,
78 ):
79 await m.in_progress()
82class NatsKvMessage(StreamMessage[KeyValue.Entry]):
83 pass
86class NatsObjMessage(StreamMessage[ObjectInfo]):
87 pass