Coverage for faststream / _internal / producer.py: 69%

26 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import TYPE_CHECKING, Any, Protocol 

3 

4from faststream._internal.types import PublishCommandType_contra 

5from faststream.exceptions import IncorrectState 

6 

7if TYPE_CHECKING: 

8 from faststream._internal.types import AsyncCallable 

9 from faststream.response import PublishCommand 

10 

11 

12class ProducerProto(Protocol[PublishCommandType_contra]): 

13 _parser: "AsyncCallable" 

14 _decoder: "AsyncCallable" 

15 

16 @abstractmethod 

17 async def publish(self, cmd: "PublishCommandType_contra") -> Any: 

18 """Publishes a message asynchronously.""" 

19 ... 

20 

21 @abstractmethod 

22 async def request(self, cmd: "PublishCommandType_contra") -> Any: 

23 """Publishes a message synchronously.""" 

24 ... 

25 

26 @abstractmethod 

27 async def publish_batch(self, cmd: "PublishCommandType_contra") -> Any: 

28 """Publishes a messages batch asynchronously.""" 

29 ... 

30 

31 

32class ProducerFactory(Protocol): 

33 def __call__( 

34 self, 

35 parser: "AsyncCallable", 

36 decoder: "AsyncCallable", 

37 ) -> ProducerProto: ... 

38 

39 

40class ProducerUnset(ProducerProto): 

41 msg = "Producer is unset yet. You should set producer in broker initial method." 

42 

43 def __bool__(self) -> bool: 

44 return False 

45 

46 @property 

47 def _parser(self) -> "AsyncCallable": 

48 raise IncorrectState(self.msg) 

49 

50 @_parser.setter 

51 def _parser(self, value: "AsyncCallable", /) -> "AsyncCallable": 

52 raise IncorrectState(self.msg) 

53 

54 @property 

55 def _decoder(self) -> "AsyncCallable": 

56 raise IncorrectState(self.msg) 

57 

58 @_decoder.setter 

59 def _decoder(self, value: "AsyncCallable", /) -> "AsyncCallable": 

60 raise IncorrectState(self.msg) 

61 

62 async def publish(self, cmd: "PublishCommand") -> Any | None: 

63 raise IncorrectState(self.msg) 

64 

65 async def request(self, cmd: "PublishCommand") -> Any: 

66 raise IncorrectState(self.msg) 

67 

68 async def publish_batch(self, cmd: "PublishCommand") -> None: 

69 raise IncorrectState(self.msg)