Coverage for faststream / kafka / response.py: 98%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Any, Union 

2 

3from typing_extensions import override 

4 

5from faststream.response.publish_type import PublishType 

6from faststream.response.response import ( 

7 BatchPublishCommand, 

8 PublishCommand, 

9 Response, 

10 extract_per_message_keys_and_bodies, 

11 key_for_index, 

12) 

13 

14if TYPE_CHECKING: 

15 from faststream._internal.basic_types import SendableMessage 

16 

17 

18class KafkaResponse(Response): 

19 """Kafka-specific response object for outgoing messages. 

20 

21 Can be used in two ways: 

22 1. As a return value from handler to send a response message 

23 2. Directly in publish_batch() to set per-message attributes (key, headers, etc.) 

24 

25 For publish operations, consider using the more semantic alias `KafkaPublishMessage`. 

26 """ 

27 

28 def __init__( 

29 self, 

30 body: "SendableMessage", 

31 *, 

32 headers: dict[str, Any] | None = None, 

33 correlation_id: str | None = None, 

34 timestamp_ms: int | None = None, 

35 key: bytes | None = None, 

36 ) -> None: 

37 super().__init__( 

38 body=body, 

39 headers=headers, 

40 correlation_id=correlation_id, 

41 ) 

42 

43 self.timestamp_ms = timestamp_ms 

44 self.key = key 

45 

46 @override 

47 def get_publish_key(self) -> bytes | None: 

48 """Return the Kafka message key for publishing.""" 

49 return self.key 

50 

51 @override 

52 def as_publish_command(self) -> "KafkaPublishCommand": 

53 return KafkaPublishCommand( 

54 self.body, 

55 headers=self.headers, 

56 correlation_id=self.correlation_id, 

57 _publish_type=PublishType.PUBLISH, 

58 # Kafka specific 

59 topic="", 

60 key=self.key, 

61 timestamp_ms=self.timestamp_ms, 

62 ) 

63 

64 

65class KafkaPublishCommand(BatchPublishCommand): 

66 def __init__( 

67 self, 

68 message: "SendableMessage", 

69 /, 

70 *messages: "SendableMessage", 

71 topic: str, 

72 _publish_type: PublishType, 

73 key: bytes | Any | None = None, 

74 partition: int | None = None, 

75 timestamp_ms: int | None = None, 

76 headers: dict[str, str] | None = None, 

77 correlation_id: str | None = None, 

78 reply_to: str = "", 

79 no_confirm: bool = False, 

80 timeout: float = 0.5, 

81 ) -> None: 

82 super().__init__( 

83 message, 

84 *messages, 

85 destination=topic, 

86 reply_to=reply_to, 

87 correlation_id=correlation_id, 

88 headers=headers, 

89 _publish_type=_publish_type, 

90 ) 

91 

92 self.key = key 

93 self.partition = partition 

94 self.timestamp_ms = timestamp_ms 

95 self.no_confirm = no_confirm 

96 

97 # request option 

98 self.timeout = timeout 

99 

100 # per-message keys support 

101 keys, normalized = extract_per_message_keys_and_bodies(self.batch_bodies) 

102 if normalized is not None: 

103 self.batch_bodies = normalized 

104 self._per_message_keys = keys 

105 

106 @classmethod 

107 def from_cmd( 

108 cls, 

109 cmd: Union["PublishCommand", "KafkaPublishCommand"], 

110 *, 

111 batch: bool = False, 

112 ) -> "KafkaPublishCommand": 

113 if isinstance(cmd, KafkaPublishCommand): 

114 # NOTE: Should return a copy probably. 

115 return cmd 

116 

117 body, extra_bodies = cls._parse_bodies(cmd.body, batch=batch) 

118 

119 return cls( 

120 body, 

121 *extra_bodies, 

122 topic=cmd.destination, 

123 correlation_id=cmd.correlation_id, 

124 headers=cmd.headers, 

125 reply_to=cmd.reply_to, 

126 _publish_type=cmd.publish_type, 

127 ) 

128 

129 def key_for(self, index: int) -> Any | None: 

130 return key_for_index(self._per_message_keys, self.key, index) 

131 

132 def headers_to_publish(self) -> dict[str, str]: 

133 headers = {} 

134 

135 if self.correlation_id: 135 ↛ 138line 135 didn't jump to line 138 because the condition on line 135 was always true

136 headers["correlation_id"] = self.correlation_id 

137 

138 if self.reply_to: 

139 headers["reply_to"] = self.reply_to 

140 

141 return headers | self.headers 

142 

143 

144# Semantic alias for publish operations 

145# More intuitive name when using in publish_batch() rather than as handler return value 

146KafkaPublishMessage = KafkaResponse