Coverage for faststream / rabbit / helpers / state.py: 80%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from dataclasses import dataclass 

2from typing import TYPE_CHECKING, Protocol 

3 

4from faststream.exceptions import IncorrectState 

5 

6if TYPE_CHECKING: 

7 from aio_pika import RobustConnection 

8 

9 

10class ConnectionState(Protocol): 

11 @property 

12 def connection(self) -> "RobustConnection": ... 

13 

14 

15class EmptyConnectionState: 

16 __slots__ = () 

17 

18 @property 

19 def connection(self) -> "RobustConnection": 

20 msg = "You should connect broker first." 

21 raise IncorrectState(msg) 

22 

23 

24@dataclass(slots=True) 

25class ConnectedState: 

26 connection: "RobustConnection"