Coverage for faststream / _internal / endpoint / publisher / fake.py: 87%

13 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Iterable 

3from functools import partial 

4from typing import TYPE_CHECKING, Any 

5 

6from faststream._internal.basic_types import SendableMessage 

7from faststream.response.publish_type import PublishType 

8 

9from .proto import PublisherProto 

10 

11if TYPE_CHECKING: 

12 from faststream._internal.basic_types import AsyncFunc 

13 from faststream._internal.producer import ProducerProto 

14 from faststream._internal.types import PublisherMiddleware 

15 from faststream.response.response import PublishCommand 

16 

17 

18class FakePublisher(PublisherProto): 

19 """Publisher Interface implementation to use as RPC or REPLY TO answer publisher.""" 

20 

21 def __init__( 

22 self, 

23 *, 

24 producer: "ProducerProto[Any]", 

25 ) -> None: 

26 """Initialize an object.""" 

27 self._producer = producer 

28 

29 @abstractmethod 

30 def patch_command(self, cmd: "PublishCommand") -> "PublishCommand": 

31 cmd.publish_type = PublishType.REPLY 

32 return cmd 

33 

34 async def _publish( 

35 self, 

36 cmd: "PublishCommand", 

37 *, 

38 _extra_middlewares: Iterable["PublisherMiddleware"], 

39 ) -> Any: 

40 """This method should be called in subscriber flow only.""" 

41 cmd = self.patch_command(cmd) 

42 

43 call: AsyncFunc = self._producer.publish 

44 for m in _extra_middlewares: 

45 call = partial(m, call) 

46 

47 return await call(cmd) 

48 

49 async def publish( 

50 self, 

51 message: SendableMessage, 

52 /, 

53 *, 

54 correlation_id: str | None = None, 

55 ) -> Any | None: 

56 msg = ( 

57 f"`{self.__class__.__name__}` can be used only to publish " 

58 "a response for `reply-to` or `RPC` messages." 

59 ) 

60 raise NotImplementedError(msg) 

61 

62 async def request( 

63 self, 

64 message: "SendableMessage", 

65 /, 

66 *, 

67 correlation_id: str | None = None, 

68 ) -> Any: 

69 msg = ( 

70 f"`{self.__class__.__name__}` can be used only to publish " 

71 "a response for `reply-to` or `RPC` messages." 

72 ) 

73 raise NotImplementedError(msg)