Coverage for faststream / rabbit / publisher / fake.py: 94%
14 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import TYPE_CHECKING, Any, Union
3from faststream._internal.endpoint.publisher.fake import FakePublisher
4from faststream.rabbit.response import RabbitPublishCommand
5from faststream.rabbit.schemas import RabbitExchange
7if TYPE_CHECKING:
8 from faststream._internal.producer import ProducerProto
9 from faststream.response.response import PublishCommand
12class RabbitFakePublisher(FakePublisher):
13 """Publisher Interface implementation to use as RPC or REPLY TO answer publisher."""
15 def __init__(
16 self,
17 producer: "ProducerProto[Any]",
18 routing_key: str,
19 app_id: str | None,
20 exchange: RabbitExchange,
21 ) -> None:
22 super().__init__(producer=producer)
23 self.routing_key = routing_key
24 self.exchange = exchange
25 self.app_id = app_id
27 def patch_command(
28 self,
29 cmd: Union["PublishCommand", "RabbitPublishCommand"],
30 ) -> "RabbitPublishCommand":
31 cmd = super().patch_command(cmd)
32 real_cmd = RabbitPublishCommand.from_cmd(cmd)
33 real_cmd.destination = self.routing_key
34 real_cmd.exchange = self.exchange
35 if self.app_id: 35 ↛ 37line 35 didn't jump to line 37 because the condition on line 35 was always true
36 real_cmd.message_options["app_id"] = self.app_id
37 return real_cmd