Coverage for faststream / kafka / publisher / state.py: 77%
26 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from typing import TYPE_CHECKING, Protocol
4from faststream.exceptions import IncorrectState
6if TYPE_CHECKING:
7 from aiokafka import AIOKafkaProducer
10class ProducerState(Protocol):
11 producer: "AIOKafkaProducer"
13 @property
14 @abstractmethod
15 def closed(self) -> bool: ...
17 def __bool__(self) -> bool: ...
19 async def stop(self) -> None: ...
21 async def flush(self) -> None: ...
24class EmptyProducerState(ProducerState):
25 __slots__ = ()
27 closed = True
29 @property
30 def producer(self) -> "AIOKafkaProducer":
31 msg = "You can't use producer here, please connect broker first."
32 raise IncorrectState(msg)
34 def __bool__(self) -> bool:
35 return False
37 async def stop(self) -> None:
38 pass
40 async def flush(self) -> None:
41 pass
44class RealProducer(ProducerState):
45 __slots__ = ("producer",)
47 def __init__(self, producer: "AIOKafkaProducer") -> None:
48 self.producer = producer
50 def __bool__(self) -> bool:
51 return True
53 async def stop(self) -> None:
54 await self.producer.stop()
56 @property
57 def closed(self) -> bool:
58 return self.producer._closed or False
60 async def flush(self) -> None:
61 await self.producer.flush()