Coverage for faststream / nats / helpers / state.py: 87%

15 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Protocol, TypeVar 

2 

3from nats.aio.client import Client 

4from nats.js import JetStreamContext 

5 

6from faststream.exceptions import IncorrectState 

7 

8ClientT = TypeVar("ClientT", Client, JetStreamContext) 

9 

10 

11class ConnectionState(Protocol[ClientT]): 

12 connection: ClientT 

13 

14 

15class EmptyConnectionState(ConnectionState[ClientT]): 

16 __slots__ = () 

17 

18 @property 

19 def connection(self) -> ClientT: 

20 raise IncorrectState 

21 

22 @connection.setter 

23 def connection(self, v: ClientT) -> None: 

24 raise IncorrectState 

25 

26 

27class ConnectedState(ConnectionState[ClientT]): 

28 __slots__ = ("connection",) 

29 

30 def __init__(self, connection: ClientT) -> None: 

31 self.connection: ClientT = connection