Coverage for faststream / exceptions.py: 91%
58 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Iterable
2from pprint import pformat
3from typing import Any
6class FastStreamException(Exception): # noqa: N818
7 """Basic FastStream exception class."""
10class IgnoredException(FastStreamException):
11 """Basic Exception class ignoring by watcher context and log middleware."""
14class StopConsume(IgnoredException):
15 """Raise it to stop Handler consuming."""
17 def __str__(self) -> str:
18 return "Consumer was stopped"
21class StopApplication(IgnoredException, SystemExit):
22 """Raise it to stop FastStream application."""
24 def __str__(self) -> str:
25 return "Application was stopped"
28class HandlerException(IgnoredException):
29 """Base Handler Exception."""
32class SkipMessage(HandlerException):
33 """Watcher Instruction to skip message."""
35 def __str__(self) -> str:
36 return "Message was skipped"
39class AckMessage(HandlerException):
40 """Exception raised to acknowledge a message immediately.
42 This exception can be used to ack a message with additional options.
43 To watch all allowed parameters, please take a look at your broker `message.ack(**extra_options)` method
44 signature.
46 Args:
47 extra_options (Any): Additional parameters that will be passed to `message.ack(**extra_options)` method.
48 """
50 def __init__(self, **extra_options: Any) -> None:
51 self.extra_options = extra_options
52 super().__init__()
54 def __str__(self) -> str:
55 return "Message was acked"
58class NackMessage(HandlerException):
59 """Exception raised to negatively acknowledge a message immediately.
61 This exception can be used to nack a message with additional options.
62 To watch all allowed parameters, please take a look to your broker's `message.nack(**extra_options)` method
63 signature.
65 Args:
66 kwargs (Any): Additional parameters that will be passed to `message.nack(**extra_options)` method.
67 """
69 def __init__(self, **kwargs: Any) -> None:
70 self.extra_options = kwargs
71 super().__init__()
73 def __str__(self) -> str:
74 return "Message was nacked"
77class RejectMessage(HandlerException):
78 """Exception raised to reject a message immediately.
80 This exception can be used to reject a message with additional options.
81 To watch all allowed parameters, please take a look to your broker's `message.reject(**extra_options)` method
82 signature.
84 Args:
85 kwargs (Any): Additional parameters that will be passed to `message.reject(**extra_options)` method.
86 """
88 def __init__(self, **kwargs: Any) -> None:
89 self.extra_options = kwargs
90 super().__init__()
92 def __str__(self) -> str:
93 return "Message was rejected"
96class SetupError(FastStreamException, ValueError):
97 """Exception to raise at wrong method usage."""
100class StartupValidationError(FastStreamException, ValueError):
101 """Exception to raise at startup hook validation error."""
103 def __init__(
104 self,
105 missed_fields: Iterable[str] = (),
106 invalid_fields: Iterable[str] = (),
107 ) -> None:
108 self.missed_fields = missed_fields
109 self.invalid_fields = invalid_fields
111 def __str__(self) -> str:
112 return (
113 f"\n Incorrect options `{' / '.join(f'--{i}' for i in (*self.missed_fields, *self.invalid_fields))}`"
114 "\n You registered extra options in your application `lifespan/on_startup` hook, but set them wrong in CLI."
115 )
118class FeatureNotSupportedException(FastStreamException, NotImplementedError): # noqa: N818
119 """Raises at planned NotImplemented operation call."""
122class SubscriberNotFound(FastStreamException):
123 """Raises as a service message or in tests."""
126class IncorrectState(FastStreamException):
127 """Raises in FSM at wrong state calling."""
130class ContextError(FastStreamException, KeyError):
131 """Raises if context exception occurred."""
133 def __init__(self, context: Any, field: str) -> None:
134 self.context = context
135 self.field = field
137 def __str__(self) -> str:
138 return "".join(
139 (
140 f"\n Key `{self.field}` not found in the context\n ",
141 pformat(self.context),
142 ),
143 )
146WRONG_PUBLISH_ARGS = SetupError(
147 "You should use `reply_to` to send response to long-living queue "
148 "and `rpc` to get response in sync mode.",
149)
152NOT_CONNECTED_YET = "Please, `connect()` the broker first."
155INSTALL_YAML = """
156To use feature which need yaml, please install dependencies:\n
157pip install PyYAML
158"""
160INSTALL_TOML = """
161To use feature which need toml, please install dependencies:\n
162pip install tomli
163"""
165INSTALL_WATCHFILES = """
166To use restart feature, please install dependencies:\n
167pip install watchfiles
168"""
170SCHEMA_NOT_SUPPORTED = "`{schema_filename}` not supported. Make sure that your schema is valid and schema version supported by FastStream"
172INSTALL_FASTSTREAM_RABBIT = """
173To use RabbitMQ with FastStream, please install dependencies:\n
174pip install "faststream[rabbit]"
175"""
177INSTALL_FASTSTREAM_KAFKA = """
178To use Apache Kafka with FastStream, please install dependencies:\n
179pip install "faststream[kafka]"
180"""
182INSTALL_FASTSTREAM_CONFLUENT = """
183To use Confluent Kafka with FastStream, please install dependencies:\n
184pip install "faststream[confluent]"
185"""
188INSTALL_FASTSTREAM_REDIS = """
189To use Redis with FastStream, please install dependencies:\n
190pip install "faststream[redis]"
191"""
193INSTALL_FASTSTREAM_NATS = """
194To use NATS with FastStream, please install dependencies:\n
195pip install "faststream[nats]"
196"""
198INSTALL_UVICORN = """
199To run FastStream ASGI App via CLI, please install uvicorn:\n
200pip install uvicorn
201"""