Coverage for faststream / rabbit / publisher / fake.py: 94%

14 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any, Union 

2 

3from faststream._internal.endpoint.publisher.fake import FakePublisher 

4from faststream.rabbit.response import RabbitPublishCommand 

5from faststream.rabbit.schemas import RabbitExchange 

6 

7if TYPE_CHECKING: 

8 from faststream._internal.producer import ProducerProto 

9 from faststream.response.response import PublishCommand 

10 

11 

12class RabbitFakePublisher(FakePublisher): 

13 """Publisher Interface implementation to use as RPC or REPLY TO answer publisher.""" 

14 

15 def __init__( 

16 self, 

17 producer: "ProducerProto[Any]", 

18 routing_key: str, 

19 app_id: str | None, 

20 exchange: RabbitExchange, 

21 ) -> None: 

22 super().__init__(producer=producer) 

23 self.routing_key = routing_key 

24 self.exchange = exchange 

25 self.app_id = app_id 

26 

27 def patch_command( 

28 self, 

29 cmd: Union["PublishCommand", "RabbitPublishCommand"], 

30 ) -> "RabbitPublishCommand": 

31 cmd = super().patch_command(cmd) 

32 real_cmd = RabbitPublishCommand.from_cmd(cmd) 

33 real_cmd.destination = self.routing_key 

34 real_cmd.exchange = self.exchange 

35 if self.app_id: 35 ↛ 37line 35 didn't jump to line 37 because the condition on line 35 was always true

36 real_cmd.message_options["app_id"] = self.app_id 

37 return real_cmd