Coverage for faststream / exceptions.py: 91%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Iterable 

2from pprint import pformat 

3from typing import Any 

4 

5 

6class FastStreamException(Exception): # noqa: N818 

7 """Basic FastStream exception class.""" 

8 

9 

10class IgnoredException(FastStreamException): 

11 """Basic Exception class ignoring by watcher context and log middleware.""" 

12 

13 

14class StopConsume(IgnoredException): 

15 """Raise it to stop Handler consuming.""" 

16 

17 def __str__(self) -> str: 

18 return "Consumer was stopped" 

19 

20 

21class StopApplication(IgnoredException, SystemExit): 

22 """Raise it to stop FastStream application.""" 

23 

24 def __str__(self) -> str: 

25 return "Application was stopped" 

26 

27 

28class HandlerException(IgnoredException): 

29 """Base Handler Exception.""" 

30 

31 

32class SkipMessage(HandlerException): 

33 """Watcher Instruction to skip message.""" 

34 

35 def __str__(self) -> str: 

36 return "Message was skipped" 

37 

38 

39class AckMessage(HandlerException): 

40 """Exception raised to acknowledge a message immediately. 

41 

42 This exception can be used to ack a message with additional options. 

43 To watch all allowed parameters, please take a look at your broker `message.ack(**extra_options)` method 

44 signature. 

45 

46 Args: 

47 extra_options (Any): Additional parameters that will be passed to `message.ack(**extra_options)` method. 

48 """ 

49 

50 def __init__(self, **extra_options: Any) -> None: 

51 self.extra_options = extra_options 

52 super().__init__() 

53 

54 def __str__(self) -> str: 

55 return "Message was acked" 

56 

57 

58class NackMessage(HandlerException): 

59 """Exception raised to negatively acknowledge a message immediately. 

60 

61 This exception can be used to nack a message with additional options. 

62 To watch all allowed parameters, please take a look to your broker's `message.nack(**extra_options)` method 

63 signature. 

64 

65 Args: 

66 kwargs (Any): Additional parameters that will be passed to `message.nack(**extra_options)` method. 

67 """ 

68 

69 def __init__(self, **kwargs: Any) -> None: 

70 self.extra_options = kwargs 

71 super().__init__() 

72 

73 def __str__(self) -> str: 

74 return "Message was nacked" 

75 

76 

77class RejectMessage(HandlerException): 

78 """Exception raised to reject a message immediately. 

79 

80 This exception can be used to reject a message with additional options. 

81 To watch all allowed parameters, please take a look to your broker's `message.reject(**extra_options)` method 

82 signature. 

83 

84 Args: 

85 kwargs (Any): Additional parameters that will be passed to `message.reject(**extra_options)` method. 

86 """ 

87 

88 def __init__(self, **kwargs: Any) -> None: 

89 self.extra_options = kwargs 

90 super().__init__() 

91 

92 def __str__(self) -> str: 

93 return "Message was rejected" 

94 

95 

96class SetupError(FastStreamException, ValueError): 

97 """Exception to raise at wrong method usage.""" 

98 

99 

100class StartupValidationError(FastStreamException, ValueError): 

101 """Exception to raise at startup hook validation error.""" 

102 

103 def __init__( 

104 self, 

105 missed_fields: Iterable[str] = (), 

106 invalid_fields: Iterable[str] = (), 

107 ) -> None: 

108 self.missed_fields = missed_fields 

109 self.invalid_fields = invalid_fields 

110 

111 def __str__(self) -> str: 

112 return ( 

113 f"\n Incorrect options `{' / '.join(f'--{i}' for i in (*self.missed_fields, *self.invalid_fields))}`" 

114 "\n You registered extra options in your application `lifespan/on_startup` hook, but set them wrong in CLI." 

115 ) 

116 

117 

118class FeatureNotSupportedException(FastStreamException, NotImplementedError): # noqa: N818 

119 """Raises at planned NotImplemented operation call.""" 

120 

121 

122class SubscriberNotFound(FastStreamException): 

123 """Raises as a service message or in tests.""" 

124 

125 

126class IncorrectState(FastStreamException): 

127 """Raises in FSM at wrong state calling.""" 

128 

129 

130class ContextError(FastStreamException, KeyError): 

131 """Raises if context exception occurred.""" 

132 

133 def __init__(self, context: Any, field: str) -> None: 

134 self.context = context 

135 self.field = field 

136 

137 def __str__(self) -> str: 

138 return "".join( 

139 ( 

140 f"\n Key `{self.field}` not found in the context\n ", 

141 pformat(self.context), 

142 ), 

143 ) 

144 

145 

146WRONG_PUBLISH_ARGS = SetupError( 

147 "You should use `reply_to` to send response to long-living queue " 

148 "and `rpc` to get response in sync mode.", 

149) 

150 

151 

152NOT_CONNECTED_YET = "Please, `connect()` the broker first." 

153 

154 

155INSTALL_YAML = """ 

156To use feature which need yaml, please install dependencies:\n 

157pip install PyYAML 

158""" 

159 

160INSTALL_TOML = """ 

161To use feature which need toml, please install dependencies:\n 

162pip install tomli 

163""" 

164 

165INSTALL_WATCHFILES = """ 

166To use restart feature, please install dependencies:\n 

167pip install watchfiles 

168""" 

169 

170SCHEMA_NOT_SUPPORTED = "`{schema_filename}` not supported. Make sure that your schema is valid and schema version supported by FastStream" 

171 

172INSTALL_FASTSTREAM_RABBIT = """ 

173To use RabbitMQ with FastStream, please install dependencies:\n 

174pip install "faststream[rabbit]" 

175""" 

176 

177INSTALL_FASTSTREAM_KAFKA = """ 

178To use Apache Kafka with FastStream, please install dependencies:\n 

179pip install "faststream[kafka]" 

180""" 

181 

182INSTALL_FASTSTREAM_CONFLUENT = """ 

183To use Confluent Kafka with FastStream, please install dependencies:\n 

184pip install "faststream[confluent]" 

185""" 

186 

187 

188INSTALL_FASTSTREAM_REDIS = """ 

189To use Redis with FastStream, please install dependencies:\n 

190pip install "faststream[redis]" 

191""" 

192 

193INSTALL_FASTSTREAM_NATS = """ 

194To use NATS with FastStream, please install dependencies:\n 

195pip install "faststream[nats]" 

196""" 

197 

198INSTALL_UVICORN = """ 

199To run FastStream ASGI App via CLI, please install uvicorn:\n 

200pip install uvicorn 

201"""