Coverage for faststream / nats / subscriber / usecases / core_subscriber.py: 84%

47 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import AsyncIterator 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4from nats.errors import TimeoutError 

5from typing_extensions import override 

6 

7from faststream._internal.endpoint.subscriber.mixins import ConcurrentMixin 

8from faststream._internal.endpoint.utils import process_msg 

9from faststream.nats.parser import NatsParser 

10 

11from .basic import DefaultSubscriber 

12 

13if TYPE_CHECKING: 

14 from nats.aio.msg import Msg 

15 from nats.aio.subscription import Subscription 

16 

17 from faststream._internal.endpoint.subscriber import SubscriberSpecification 

18 from faststream._internal.endpoint.subscriber.call_item import CallsCollection 

19 from faststream.message import StreamMessage 

20 from faststream.nats.message import NatsMessage 

21 from faststream.nats.subscriber.config import NatsSubscriberConfig 

22 

23 

24class CoreSubscriber(DefaultSubscriber["Msg"]): 

25 subscription: Optional["Subscription"] 

26 _fetch_sub: Optional["Subscription"] 

27 

28 def __init__( 

29 self, 

30 config: "NatsSubscriberConfig", 

31 specification: "SubscriberSpecification[Any, Any]", 

32 calls: "CallsCollection[Msg]", 

33 *, 

34 queue: str, 

35 ) -> None: 

36 parser = NatsParser( 

37 pattern=config.subject, 

38 is_ack_disabled=True, # core subscriber has no ack policy 

39 ) 

40 config.parser = parser.parse_message 

41 config.decoder = parser.decode_message 

42 super().__init__(config, specification, calls) 

43 

44 self.queue = queue 

45 

46 @override 

47 async def get_one( 

48 self, 

49 *, 

50 timeout: float = 5.0, 

51 ) -> "NatsMessage | None": 

52 assert not self.calls, ( 

53 "You can't use `get_one` method if subscriber has registered handlers." 

54 ) 

55 

56 if self._fetch_sub is None: 56 ↛ 63line 56 didn't jump to line 63 because the condition on line 56 was always true

57 fetch_sub = self._fetch_sub = await self.connection.subscribe( 

58 subject=self.clear_subject, 

59 queue=self.queue, 

60 **self.extra_options, 

61 ) 

62 else: 

63 fetch_sub = self._fetch_sub 

64 

65 try: 

66 raw_message = await fetch_sub.next_msg(timeout=timeout) 

67 except TimeoutError: 

68 return None 

69 

70 context = self._outer_config.fd_config.context 

71 

72 async_parser, async_decoder = self._get_parser_and_decoder() 

73 

74 msg: NatsMessage = await process_msg( # type: ignore[assignment] 

75 msg=raw_message, 

76 middlewares=( 

77 m(raw_message, context=context) for m in self._broker_middlewares 

78 ), 

79 parser=async_parser, 

80 decoder=async_decoder, 

81 ) 

82 return msg 

83 

84 @override 

85 async def __aiter__(self) -> AsyncIterator["NatsMessage"]: # type: ignore[override] 

86 assert not self.calls, ( 

87 "You can't use iterator if subscriber has registered handlers." 

88 ) 

89 

90 if self._fetch_sub is None: 90 ↛ 97line 90 didn't jump to line 97 because the condition on line 90 was always true

91 fetch_sub = self._fetch_sub = await self.connection.subscribe( 

92 subject=self.clear_subject, 

93 queue=self.queue, 

94 **self.extra_options, 

95 ) 

96 else: 

97 fetch_sub = self._fetch_sub 

98 

99 context = self._outer_config.fd_config.context 

100 async_parser, async_decoder = self._get_parser_and_decoder() 

101 

102 async for raw_message in fetch_sub.messages: 102 ↛ exitline 102 didn't return from function '__aiter__' because the loop on line 102 didn't complete

103 msg: NatsMessage = await process_msg( # type: ignore[assignment] 

104 msg=raw_message, 

105 middlewares=( 

106 m(raw_message, context=context) for m in self._broker_middlewares 

107 ), 

108 parser=async_parser, 

109 decoder=async_decoder, 

110 ) 

111 yield msg 

112 

113 async def _create_subscription(self) -> None: 

114 """Create NATS subscription and start consume task.""" 

115 if self.subscription: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 return 

117 

118 self.subscription = await self.connection.subscribe( 

119 subject=self.clear_subject, 

120 queue=self.queue, 

121 cb=self.consume, 

122 **self.extra_options, 

123 ) 

124 

125 def get_log_context( 

126 self, 

127 message: Optional["StreamMessage[Msg]"], 

128 ) -> dict[str, str]: 

129 """Log context factory using in `self.consume` scope. 

130 

131 Args: 

132 message: Message which we are building context for 

133 """ 

134 return self.build_log_context( 

135 message=message, 

136 subject=self.subject, 

137 queue=self.queue, 

138 ) 

139 

140 

141class ConcurrentCoreSubscriber(ConcurrentMixin["Msg"], CoreSubscriber): 

142 @override 

143 async def _create_subscription(self) -> None: 

144 """Create NATS subscription and start consume task.""" 

145 if self.subscription: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return 

147 

148 self.start_consume_task() 

149 

150 self.subscription = await self.connection.subscribe( 

151 subject=self.clear_subject, 

152 queue=self.queue, 

153 cb=self._put_msg, 

154 **self.extra_options, 

155 )