Coverage for faststream / kafka / publisher / state.py: 77%

26 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import TYPE_CHECKING, Protocol 

3 

4from faststream.exceptions import IncorrectState 

5 

6if TYPE_CHECKING: 

7 from aiokafka import AIOKafkaProducer 

8 

9 

10class ProducerState(Protocol): 

11 producer: "AIOKafkaProducer" 

12 

13 @property 

14 @abstractmethod 

15 def closed(self) -> bool: ... 

16 

17 def __bool__(self) -> bool: ... 

18 

19 async def stop(self) -> None: ... 

20 

21 async def flush(self) -> None: ... 

22 

23 

24class EmptyProducerState(ProducerState): 

25 __slots__ = () 

26 

27 closed = True 

28 

29 @property 

30 def producer(self) -> "AIOKafkaProducer": 

31 msg = "You can't use producer here, please connect broker first." 

32 raise IncorrectState(msg) 

33 

34 def __bool__(self) -> bool: 

35 return False 

36 

37 async def stop(self) -> None: 

38 pass 

39 

40 async def flush(self) -> None: 

41 pass 

42 

43 

44class RealProducer(ProducerState): 

45 __slots__ = ("producer",) 

46 

47 def __init__(self, producer: "AIOKafkaProducer") -> None: 

48 self.producer = producer 

49 

50 def __bool__(self) -> bool: 

51 return True 

52 

53 async def stop(self) -> None: 

54 await self.producer.stop() 

55 

56 @property 

57 def closed(self) -> bool: 

58 return self.producer._closed or False 

59 

60 async def flush(self) -> None: 

61 await self.producer.flush()