Coverage for faststream / rabbit / helpers / state.py: 80%
10 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from dataclasses import dataclass
2from typing import TYPE_CHECKING, Protocol
4from faststream.exceptions import IncorrectState
6if TYPE_CHECKING:
7 from aio_pika import RobustConnection
10class ConnectionState(Protocol):
11 @property
12 def connection(self) -> "RobustConnection": ...
15class EmptyConnectionState:
16 __slots__ = ()
18 @property
19 def connection(self) -> "RobustConnection":
20 msg = "You should connect broker first."
21 raise IncorrectState(msg)
24@dataclass(slots=True)
25class ConnectedState:
26 connection: "RobustConnection"