Coverage for faststream / nats / parser.py: 94%
44 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any
3from faststream._internal.utils.path import match_path
4from faststream.message import (
5 StreamMessage,
6 decode_message,
7)
8from faststream.nats.message import (
9 NatsBatchMessage,
10 NatsKvMessage,
11 NatsMessage,
12 NatsObjMessage,
13)
14from faststream.nats.schemas.js_stream import compile_nats_wildcard
16if TYPE_CHECKING:
17 from nats.aio.msg import Msg
18 from nats.js.api import ObjectInfo
19 from nats.js.kv import KeyValue
21 from faststream._internal.basic_types import DecodedMessage
24class NatsBaseParser:
25 """A class to parse NATS messages."""
27 def __init__(
28 self,
29 *,
30 pattern: str,
31 ) -> None:
32 path_re, _ = compile_nats_wildcard(pattern)
33 self._path_re = path_re
35 async def decode_message(
36 self,
37 msg: "StreamMessage[Any]",
38 ) -> "DecodedMessage":
39 return decode_message(msg)
42class NatsParser(NatsBaseParser):
43 """A class to parse NATS core messages."""
45 def __init__(self, *, pattern: str, is_ack_disabled: bool) -> None:
46 super().__init__(pattern=pattern)
48 self.is_ack_disabled = is_ack_disabled
50 async def parse_message(
51 self,
52 message: "Msg",
53 ) -> "StreamMessage[Msg]":
54 path = match_path(self._path_re, message.subject)
56 headers = message.header or {}
58 if self.is_ack_disabled: 58 ↛ 61line 58 didn't jump to line 61 because the condition on line 58 was always true
59 message._ackd = True
61 return NatsMessage(
62 raw_message=message,
63 body=message.data,
64 path=path,
65 reply_to=message.reply,
66 headers=headers,
67 content_type=headers.get("content-type", ""),
68 message_id=headers.get("message_id"),
69 correlation_id=headers.get("correlation_id"),
70 )
73class JsParser(NatsBaseParser):
74 """A class to parse NATS JS messages."""
76 async def parse_message(
77 self,
78 message: "Msg",
79 ) -> "StreamMessage[Msg]":
80 path = match_path(self._path_re, message.subject)
82 headers = message.header or {}
84 return NatsMessage(
85 raw_message=message,
86 body=message.data,
87 path=path,
88 reply_to=headers.get("reply_to", ""), # differ from core
89 headers=headers,
90 content_type=headers.get("content-type"),
91 message_id=headers.get("message_id"),
92 correlation_id=headers.get("correlation_id"),
93 )
96class BatchParser(JsParser):
97 """A class to parse NATS batch messages."""
99 async def parse_batch(
100 self,
101 message: list["Msg"],
102 ) -> "StreamMessage[list[Msg]]":
103 body: list[bytes] = []
104 batch_headers: list[dict[str, str]] = []
106 if message: 106 ↛ 114line 106 didn't jump to line 114 because the condition on line 106 was always true
107 path = match_path(self._path_re, message[0].subject)
109 for m in message:
110 batch_headers.append(m.headers or {})
111 body.append(m.data)
113 else:
114 path = {}
116 headers = next(iter(batch_headers), {})
118 return NatsBatchMessage(
119 raw_message=message,
120 body=body,
121 path=path,
122 headers=headers,
123 batch_headers=batch_headers,
124 )
126 async def decode_batch(
127 self,
128 msg: "StreamMessage[list[Msg]]",
129 ) -> list["DecodedMessage"]:
130 data: list[DecodedMessage] = []
132 for m in msg.raw_message:
133 one_msg = await self.parse_message(m)
134 data.append(decode_message(one_msg))
136 return data
139class KvParser(NatsBaseParser):
140 async def parse_message(
141 self,
142 msg: "KeyValue.Entry",
143 ) -> StreamMessage["KeyValue.Entry"]:
144 return NatsKvMessage(
145 raw_message=msg,
146 body=msg.value,
147 path=match_path(self._path_re, msg.key),
148 )
151class ObjParser(NatsBaseParser):
152 async def parse_message(self, msg: "ObjectInfo") -> StreamMessage["ObjectInfo"]:
153 return NatsObjMessage(
154 raw_message=msg,
155 body=msg.name,
156 )