Coverage for faststream / nats / helpers / state.py: 87%
15 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Protocol, TypeVar
3from nats.aio.client import Client
4from nats.js import JetStreamContext
6from faststream.exceptions import IncorrectState
8ClientT = TypeVar("ClientT", Client, JetStreamContext)
11class ConnectionState(Protocol[ClientT]):
12 connection: ClientT
15class EmptyConnectionState(ConnectionState[ClientT]):
16 __slots__ = ()
18 @property
19 def connection(self) -> ClientT:
20 raise IncorrectState
22 @connection.setter
23 def connection(self, v: ClientT) -> None:
24 raise IncorrectState
27class ConnectedState(ConnectionState[ClientT]):
28 __slots__ = ("connection",)
30 def __init__(self, connection: ClientT) -> None:
31 self.connection: ClientT = connection