Coverage for faststream / nats / message.py: 62%

38 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from nats.aio.msg import Msg 

2from nats.js.api import ObjectInfo 

3from nats.js.kv import KeyValue 

4 

5from faststream.message import StreamMessage 

6 

7 

8class NatsMessage(StreamMessage[Msg]): 

9 """A class to represent a NATS message.""" 

10 

11 async def ack(self) -> None: 

12 # Check `self.raw_message._ackd` instead of `self.committed` 

13 # to be compatible with `self.raw_message.ack()` 

14 if not self.raw_message._ackd: 

15 await self.raw_message.ack() 

16 await super().ack() 

17 

18 async def ack_sync(self) -> None: 

19 if not self.raw_message._ackd: 19 ↛ 21line 19 didn't jump to line 21 because the condition on line 19 was always true

20 await self.raw_message.ack_sync() 

21 await super().ack() 

22 

23 async def nack( 

24 self, 

25 delay: float | None = None, 

26 ) -> None: 

27 if not self.raw_message._ackd: 

28 await self.raw_message.nak(delay=delay) 

29 await super().nack() 

30 

31 async def reject(self) -> None: 

32 if not self.raw_message._ackd: 32 ↛ 33line 32 didn't jump to line 33 because the condition on line 32 was never true

33 await self.raw_message.term() 

34 await super().reject() 

35 

36 async def in_progress(self) -> None: 

37 if not self.raw_message._ackd: 

38 await self.raw_message.in_progress() 

39 

40 

41class NatsBatchMessage(StreamMessage[list[Msg]]): 

42 """A class to represent a NATS batch message.""" 

43 

44 async def ack(self) -> None: 

45 for m in filter( 

46 lambda m: not m._ackd, 

47 self.raw_message, 

48 ): 

49 await m.ack() 

50 

51 await super().ack() 

52 

53 async def nack( 

54 self, 

55 delay: float | None = None, 

56 ) -> None: 

57 for m in filter( 

58 lambda m: not m._ackd, 

59 self.raw_message, 

60 ): 

61 await m.nak(delay=delay) 

62 

63 await super().nack() 

64 

65 async def reject(self) -> None: 

66 for m in filter( 

67 lambda m: not m._ackd, 

68 self.raw_message, 

69 ): 

70 await m.term() 

71 

72 await super().reject() 

73 

74 async def in_progress(self) -> None: 

75 for m in filter( 

76 lambda m: not m._ackd, 

77 self.raw_message, 

78 ): 

79 await m.in_progress() 

80 

81 

82class NatsKvMessage(StreamMessage[KeyValue.Entry]): 

83 pass 

84 

85 

86class NatsObjMessage(StreamMessage[ObjectInfo]): 

87 pass