Coverage for faststream / nats / response.py: 92%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import TYPE_CHECKING, Optional, Union 

2 

3from typing_extensions import override 

4 

5from faststream.response.publish_type import PublishType 

6from faststream.response.response import PublishCommand, Response 

7 

8if TYPE_CHECKING: 

9 from faststream._internal.basic_types import SendableMessage 

10 from faststream.nats.schemas.schedule import Schedule 

11 

12 

13class NatsResponse(Response): 

14 def __init__( 

15 self, 

16 body: "SendableMessage", 

17 *, 

18 headers: dict[str, str] | None = None, 

19 correlation_id: str | None = None, 

20 stream: str | None = None, 

21 schedule: Optional["Schedule"] = None, 

22 ) -> None: 

23 super().__init__( 

24 body=body, 

25 headers=headers, 

26 correlation_id=correlation_id, 

27 ) 

28 self.stream = stream 

29 self.schedule = schedule 

30 

31 @override 

32 def as_publish_command(self) -> "NatsPublishCommand": 

33 return NatsPublishCommand( 

34 message=self.body, 

35 headers=self.headers, 

36 correlation_id=self.correlation_id, 

37 _publish_type=PublishType.PUBLISH, 

38 # Nats specific 

39 subject="", 

40 stream=self.stream, 

41 schedule=self.schedule, 

42 ) 

43 

44 

45class NatsPublishCommand(PublishCommand): 

46 def __init__( 

47 self, 

48 message: "SendableMessage", 

49 *, 

50 subject: str = "", 

51 correlation_id: str | None = None, 

52 headers: dict[str, str] | None = None, 

53 reply_to: str = "", 

54 stream: str | None = None, 

55 timeout: float = 0.5, 

56 schedule: Optional["Schedule"] = None, 

57 _publish_type: PublishType, 

58 ) -> None: 

59 super().__init__( 

60 body=message, 

61 destination=subject, 

62 correlation_id=correlation_id, 

63 headers=headers, 

64 reply_to=reply_to, 

65 _publish_type=_publish_type, 

66 ) 

67 

68 self.stream = stream 

69 self.timeout = timeout 

70 self.schedule = schedule 

71 

72 def headers_to_publish(self, *, js: bool = False) -> dict[str, str]: 

73 headers = {} 

74 

75 if self.correlation_id: 75 ↛ 78line 75 didn't jump to line 78 because the condition on line 75 was always true

76 headers["correlation_id"] = self.correlation_id 

77 

78 if js and self.reply_to: 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true

79 headers["reply_to"] = self.reply_to 

80 

81 if self.schedule: 

82 headers["Nats-Schedule"] = f"@at {self.schedule.time.isoformat()}" 

83 headers["Nats-Schedule-Target"] = self.schedule.target 

84 

85 return headers | self.headers 

86 

87 @classmethod 

88 def from_cmd( 

89 cls, 

90 cmd: Union["PublishCommand", "NatsPublishCommand"], 

91 ) -> "NatsPublishCommand": 

92 if isinstance(cmd, NatsPublishCommand): 

93 # NOTE: Should return a copy probably. 

94 return cmd 

95 

96 return cls( 

97 message=cmd.body, 

98 subject=cmd.destination, 

99 correlation_id=cmd.correlation_id, 

100 headers=cmd.headers, 

101 reply_to=cmd.reply_to, 

102 _publish_type=cmd.publish_type, 

103 ) 

104 

105 def __repr__(self) -> str: 

106 body = [f"body='{self.body}'", f"subject='{self.destination}'"] 

107 if self.stream: 

108 body.append(f"stream={self.stream}") 

109 if self.reply_to: 

110 body.append(f"reply_to='{self.reply_to}'") 

111 body.extend(( 

112 f"headers={self.headers}", 

113 f"correlation_id='{self.correlation_id}'", 

114 f"publish_type={self.publish_type}", 

115 )) 

116 return f"{self.__class__.__name__}({', '.join(body)})"