Coverage for faststream / confluent / publisher / state.py: 80%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Protocol 

2 

3from faststream.exceptions import IncorrectState 

4 

5if TYPE_CHECKING: 

6 from faststream.confluent.helpers.client import AsyncConfluentProducer 

7 

8 

9class ProducerState(Protocol): 

10 @property 

11 def producer(self) -> "AsyncConfluentProducer": ... 

12 

13 def __bool__(self) -> bool: ... 

14 

15 async def ping(self, timeout: float) -> bool: ... 

16 

17 async def stop(self) -> None: ... 

18 

19 async def flush(self) -> None: ... 

20 

21 

22class EmptyProducerState: 

23 __slots__ = () 

24 

25 @property 

26 def producer(self) -> "AsyncConfluentProducer": 

27 msg = "You can't use producer here, please connect broker first." 

28 raise IncorrectState(msg) 

29 

30 async def ping(self, timeout: float) -> bool: 

31 return False 

32 

33 def __bool__(self) -> bool: 

34 return False 

35 

36 async def stop(self) -> None: 

37 pass 

38 

39 async def flush(self) -> None: 

40 pass 

41 

42 

43class RealProducer: 

44 __slots__ = ("producer",) 

45 

46 def __init__(self, producer: "AsyncConfluentProducer") -> None: 

47 self.producer = producer 

48 

49 def __bool__(self) -> bool: 

50 return True 

51 

52 async def stop(self) -> None: 

53 await self.producer.stop() 

54 

55 async def ping(self, timeout: float) -> bool: 

56 return await self.producer.ping(timeout=timeout) 

57 

58 async def flush(self) -> None: 

59 await self.producer.flush()