Coverage for faststream / nats / parser.py: 94%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any 

2 

3from faststream._internal.utils.path import match_path 

4from faststream.message import ( 

5 StreamMessage, 

6 decode_message, 

7) 

8from faststream.nats.message import ( 

9 NatsBatchMessage, 

10 NatsKvMessage, 

11 NatsMessage, 

12 NatsObjMessage, 

13) 

14from faststream.nats.schemas.js_stream import compile_nats_wildcard 

15 

16if TYPE_CHECKING: 

17 from nats.aio.msg import Msg 

18 from nats.js.api import ObjectInfo 

19 from nats.js.kv import KeyValue 

20 

21 from faststream._internal.basic_types import DecodedMessage 

22 

23 

24class NatsBaseParser: 

25 """A class to parse NATS messages.""" 

26 

27 def __init__( 

28 self, 

29 *, 

30 pattern: str, 

31 ) -> None: 

32 path_re, _ = compile_nats_wildcard(pattern) 

33 self._path_re = path_re 

34 

35 async def decode_message( 

36 self, 

37 msg: "StreamMessage[Any]", 

38 ) -> "DecodedMessage": 

39 return decode_message(msg) 

40 

41 

42class NatsParser(NatsBaseParser): 

43 """A class to parse NATS core messages.""" 

44 

45 def __init__(self, *, pattern: str, is_ack_disabled: bool) -> None: 

46 super().__init__(pattern=pattern) 

47 

48 self.is_ack_disabled = is_ack_disabled 

49 

50 async def parse_message( 

51 self, 

52 message: "Msg", 

53 ) -> "StreamMessage[Msg]": 

54 path = match_path(self._path_re, message.subject) 

55 

56 headers = message.header or {} 

57 

58 if self.is_ack_disabled: 58 ↛ 61line 58 didn't jump to line 61 because the condition on line 58 was always true

59 message._ackd = True 

60 

61 return NatsMessage( 

62 raw_message=message, 

63 body=message.data, 

64 path=path, 

65 reply_to=message.reply, 

66 headers=headers, 

67 content_type=headers.get("content-type", ""), 

68 message_id=headers.get("message_id"), 

69 correlation_id=headers.get("correlation_id"), 

70 ) 

71 

72 

73class JsParser(NatsBaseParser): 

74 """A class to parse NATS JS messages.""" 

75 

76 async def parse_message( 

77 self, 

78 message: "Msg", 

79 ) -> "StreamMessage[Msg]": 

80 path = match_path(self._path_re, message.subject) 

81 

82 headers = message.header or {} 

83 

84 return NatsMessage( 

85 raw_message=message, 

86 body=message.data, 

87 path=path, 

88 reply_to=headers.get("reply_to", ""), # differ from core 

89 headers=headers, 

90 content_type=headers.get("content-type"), 

91 message_id=headers.get("message_id"), 

92 correlation_id=headers.get("correlation_id"), 

93 ) 

94 

95 

96class BatchParser(JsParser): 

97 """A class to parse NATS batch messages.""" 

98 

99 async def parse_batch( 

100 self, 

101 message: list["Msg"], 

102 ) -> "StreamMessage[list[Msg]]": 

103 body: list[bytes] = [] 

104 batch_headers: list[dict[str, str]] = [] 

105 

106 if message: 106 ↛ 114line 106 didn't jump to line 114 because the condition on line 106 was always true

107 path = match_path(self._path_re, message[0].subject) 

108 

109 for m in message: 

110 batch_headers.append(m.headers or {}) 

111 body.append(m.data) 

112 

113 else: 

114 path = {} 

115 

116 headers = next(iter(batch_headers), {}) 

117 

118 return NatsBatchMessage( 

119 raw_message=message, 

120 body=body, 

121 path=path, 

122 headers=headers, 

123 batch_headers=batch_headers, 

124 ) 

125 

126 async def decode_batch( 

127 self, 

128 msg: "StreamMessage[list[Msg]]", 

129 ) -> list["DecodedMessage"]: 

130 data: list[DecodedMessage] = [] 

131 

132 for m in msg.raw_message: 

133 one_msg = await self.parse_message(m) 

134 data.append(decode_message(one_msg)) 

135 

136 return data 

137 

138 

139class KvParser(NatsBaseParser): 

140 async def parse_message( 

141 self, 

142 msg: "KeyValue.Entry", 

143 ) -> StreamMessage["KeyValue.Entry"]: 

144 return NatsKvMessage( 

145 raw_message=msg, 

146 body=msg.value, 

147 path=match_path(self._path_re, msg.key), 

148 ) 

149 

150 

151class ObjParser(NatsBaseParser): 

152 async def parse_message(self, msg: "ObjectInfo") -> StreamMessage["ObjectInfo"]: 

153 return NatsObjMessage( 

154 raw_message=msg, 

155 body=msg.name, 

156 )