Coverage for faststream / confluent / publisher / state.py: 80%
25 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Protocol
3from faststream.exceptions import IncorrectState
5if TYPE_CHECKING:
6 from faststream.confluent.helpers.client import AsyncConfluentProducer
9class ProducerState(Protocol):
10 @property
11 def producer(self) -> "AsyncConfluentProducer": ...
13 def __bool__(self) -> bool: ...
15 async def ping(self, timeout: float) -> bool: ...
17 async def stop(self) -> None: ...
19 async def flush(self) -> None: ...
22class EmptyProducerState:
23 __slots__ = ()
25 @property
26 def producer(self) -> "AsyncConfluentProducer":
27 msg = "You can't use producer here, please connect broker first."
28 raise IncorrectState(msg)
30 async def ping(self, timeout: float) -> bool:
31 return False
33 def __bool__(self) -> bool:
34 return False
36 async def stop(self) -> None:
37 pass
39 async def flush(self) -> None:
40 pass
43class RealProducer:
44 __slots__ = ("producer",)
46 def __init__(self, producer: "AsyncConfluentProducer") -> None:
47 self.producer = producer
49 def __bool__(self) -> bool:
50 return True
52 async def stop(self) -> None:
53 await self.producer.stop()
55 async def ping(self, timeout: float) -> bool:
56 return await self.producer.ping(timeout=timeout)
58 async def flush(self) -> None:
59 await self.producer.flush()