Coverage for faststream / nats / subscriber / usecases / stream_basic.py: 96%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import AsyncIterator 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4from nats.errors import ConnectionClosedError, TimeoutError 

5from typing_extensions import override 

6 

7from faststream._internal.endpoint.utils import process_msg 

8from faststream.nats.parser import JsParser 

9 

10from .basic import DefaultSubscriber 

11 

12if TYPE_CHECKING: 

13 from nats.aio.msg import Msg 

14 from nats.js import JetStreamContext 

15 

16 from faststream._internal.endpoint.subscriber import SubscriberSpecification 

17 from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

18 from faststream.message import StreamMessage 

19 from faststream.nats.message import NatsMessage 

20 from faststream.nats.schemas import JStream 

21 from faststream.nats.subscriber.config import NatsSubscriberConfig 

22 

23 

24class StreamSubscriber(DefaultSubscriber["Msg"]): 

25 _fetch_sub: Optional["JetStreamContext.PullSubscription"] 

26 

27 def __init__( 

28 self, 

29 config: "NatsSubscriberConfig", 

30 specification: "SubscriberSpecification[Any, Any]", 

31 calls: "CallsCollection[Msg]", 

32 *, 

33 stream: "JStream", 

34 queue: str, 

35 ) -> None: 

36 parser = JsParser(pattern=config.subject) 

37 config.decoder = parser.decode_message 

38 config.parser = parser.parse_message 

39 super().__init__(config, specification, calls) 

40 

41 self.queue = queue 

42 self.stream = stream 

43 

44 def get_log_context( 

45 self, 

46 message: Optional["StreamMessage[Msg]"], 

47 ) -> dict[str, str]: 

48 """Log context factory using in `self.consume` scope. 

49 

50 Args: 

51 message: Message which we are building context for 

52 """ 

53 return self.build_log_context( 

54 message=message, 

55 subject=self._resolved_subject_string, 

56 queue=self.queue, 

57 stream=self.stream.name, 

58 ) 

59 

60 @override 

61 async def get_one(self, *, timeout: float = 5) -> Optional["NatsMessage"]: 

62 assert not self.calls, ( 

63 "You can't use `get_one` method if subscriber has registered handlers." 

64 ) 

65 

66 if not self._fetch_sub: 66 ↛ 82line 66 didn't jump to line 82 because the condition on line 66 was always true

67 extra_options = { 

68 "pending_bytes_limit": self.extra_options["pending_bytes_limit"], 

69 "pending_msgs_limit": self.extra_options["pending_msgs_limit"], 

70 "durable": self.extra_options["durable"], 

71 "stream": self.extra_options["stream"], 

72 } 

73 if inbox_prefix := self.extra_options.get("inbox_prefix"): 

74 extra_options["inbox_prefix"] = inbox_prefix 

75 

76 self._fetch_sub = await self.jetstream.pull_subscribe( 

77 subject=self.clear_subject, 

78 config=self.config, 

79 **extra_options, 

80 ) 

81 

82 try: 

83 raw_message = ( 

84 await self._fetch_sub.fetch( 

85 batch=1, 

86 timeout=timeout, 

87 ) 

88 )[0] 

89 except (TimeoutError, ConnectionClosedError): 

90 return None 

91 

92 context = self._outer_config.fd_config.context 

93 async_parser, async_decoder = self._get_parser_and_decoder() 

94 

95 msg: NatsMessage = await process_msg( # type: ignore[assignment] 

96 msg=raw_message, 

97 middlewares=( 

98 m(raw_message, context=context) for m in self._broker_middlewares 

99 ), 

100 parser=async_parser, 

101 decoder=async_decoder, 

102 ) 

103 return msg 

104 

105 @override 

106 async def __aiter__(self) -> AsyncIterator["NatsMessage"]: # type: ignore[override] 

107 assert not self.calls, ( 

108 "You can't use iterator if subscriber has registered handlers." 

109 ) 

110 

111 if not self._fetch_sub: 111 ↛ 127line 111 didn't jump to line 127 because the condition on line 111 was always true

112 extra_options = { 

113 "pending_bytes_limit": self.extra_options["pending_bytes_limit"], 

114 "pending_msgs_limit": self.extra_options["pending_msgs_limit"], 

115 "durable": self.extra_options["durable"], 

116 "stream": self.extra_options["stream"], 

117 } 

118 if inbox_prefix := self.extra_options.get("inbox_prefix"): 

119 extra_options["inbox_prefix"] = inbox_prefix 

120 

121 self._fetch_sub = await self.jetstream.pull_subscribe( 

122 subject=self.clear_subject, 

123 config=self.config, 

124 **extra_options, 

125 ) 

126 

127 context = self._outer_config.fd_config.context 

128 async_parser, async_decoder = self._get_parser_and_decoder() 

129 

130 while True: 

131 raw_message = ( 

132 await self._fetch_sub.fetch( 

133 batch=1, 

134 timeout=None, 

135 ) 

136 )[0] 

137 

138 msg: NatsMessage = await process_msg( # type: ignore[assignment] 

139 msg=raw_message, 

140 middlewares=( 

141 m(raw_message, context=context) for m in self._broker_middlewares 

142 ), 

143 parser=async_parser, 

144 decoder=async_decoder, 

145 ) 

146 yield msg