Coverage for faststream / rabbit / response.py: 95%

34 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Union 

2 

3from typing_extensions import Unpack, override 

4 

5from faststream.rabbit.schemas.exchange import RabbitExchange 

6from faststream.response import PublishCommand, Response 

7from faststream.response.publish_type import PublishType 

8 

9if TYPE_CHECKING: 

10 from aio_pika.abc import TimeoutType 

11 

12 from faststream.rabbit.publisher.options import ( 

13 BasicMessageOptions, 

14 MessageOptions, 

15 PublishOptions, 

16 ) 

17 from faststream.rabbit.types import AioPikaSendableMessage 

18 

19 

20class RabbitResponse(Response): 

21 def __init__( 

22 self, 

23 body: "AioPikaSendableMessage", 

24 *, 

25 timeout: "TimeoutType" = None, 

26 mandatory: bool = True, 

27 immediate: bool = False, 

28 exchange: RabbitExchange | str | None = None, 

29 **message_options: Unpack["MessageOptions"], 

30 ) -> None: 

31 headers = message_options.pop("headers", {}) 

32 correlation_id = message_options.pop("correlation_id", None) 

33 

34 super().__init__( 

35 body=body, 

36 headers=headers, 

37 correlation_id=correlation_id, 

38 ) 

39 

40 self.exchange = None if exchange is None else RabbitExchange.validate(exchange) 

41 self.message_options: BasicMessageOptions = message_options 

42 self.publish_options: PublishOptions = { 

43 "mandatory": mandatory, 

44 "immediate": immediate, 

45 "timeout": timeout, 

46 } 

47 

48 @override 

49 def as_publish_command(self) -> "RabbitPublishCommand": 

50 return RabbitPublishCommand( 

51 message=self.body, 

52 _publish_type=PublishType.PUBLISH, 

53 routing_key="", 

54 exchange=self.exchange, 

55 **self.publish_options, 

56 headers=self.headers, 

57 correlation_id=self.correlation_id, 

58 **self.message_options, 

59 ) 

60 

61 

62class RabbitPublishCommand(PublishCommand): 

63 def __init__( 

64 self, 

65 message: "AioPikaSendableMessage", 

66 *, 

67 _publish_type: PublishType, 

68 routing_key: str = "", 

69 exchange: RabbitExchange | None = None, 

70 # publish kwargs 

71 mandatory: bool = True, 

72 immediate: bool = False, 

73 timeout: "TimeoutType" = None, 

74 **message_options: Unpack["MessageOptions"], 

75 ) -> None: 

76 headers = message_options.pop("headers", {}) 

77 reply_to = message_options.pop("reply_to", None) or "" 

78 correlation_id = message_options.pop("correlation_id", None) 

79 

80 super().__init__( 

81 body=message, 

82 destination=routing_key, 

83 correlation_id=correlation_id, 

84 headers=headers, 

85 reply_to=reply_to, 

86 _publish_type=_publish_type, 

87 ) 

88 

89 self._exchange = exchange 

90 

91 self.timeout = timeout 

92 

93 self.message_options: BasicMessageOptions = message_options 

94 self.publish_options: PublishOptions = { 

95 "mandatory": mandatory, 

96 "immediate": immediate, 

97 } 

98 

99 @property 

100 def exchange(self) -> RabbitExchange: 

101 if self._exchange is not None: 101 ↛ 103line 101 didn't jump to line 103 because the condition on line 101 was always true

102 return self._exchange 

103 return RabbitExchange() 

104 

105 @exchange.setter 

106 def exchange(self, value: RabbitExchange) -> None: 

107 self._exchange = value 

108 

109 @classmethod 

110 def from_cmd( 

111 cls, 

112 cmd: Union["PublishCommand", "RabbitPublishCommand"], 

113 ) -> "RabbitPublishCommand": 

114 if isinstance(cmd, RabbitPublishCommand): 

115 # NOTE: Should return a copy probably. 

116 return cmd 

117 

118 return cls( 

119 message=cmd.body, 

120 routing_key=cmd.destination, 

121 correlation_id=cmd.correlation_id, 

122 headers=cmd.headers, 

123 reply_to=cmd.reply_to, 

124 _publish_type=cmd.publish_type, 

125 )