Coverage for faststream / middlewares / acknowledgement / middleware.py: 91%

51 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2from typing import TYPE_CHECKING, Any, Optional 

3 

4from faststream._internal.middlewares import BaseMiddleware 

5from faststream.exceptions import ( 

6 AckMessage, 

7 HandlerException, 

8 NackMessage, 

9 RejectMessage, 

10) 

11 

12from .config import AckPolicy 

13 

14if TYPE_CHECKING: 

15 from types import TracebackType 

16 

17 from faststream._internal.basic_types import AsyncFuncAny 

18 from faststream._internal.context.repository import ContextRepo 

19 from faststream._internal.logger import LoggerState 

20 from faststream.message import StreamMessage 

21 

22 

23class AcknowledgementMiddleware: 

24 def __init__( 

25 self, 

26 logger: "LoggerState", 

27 ack_policy: "AckPolicy", 

28 extra_options: dict[str, Any], 

29 ) -> None: 

30 self.ack_policy = ack_policy 

31 self.extra_options = extra_options 

32 self.logger = logger 

33 

34 def __call__( 

35 self, 

36 msg: Any | None, 

37 context: "ContextRepo", 

38 ) -> "_AcknowledgementMiddleware": 

39 return _AcknowledgementMiddleware( 

40 msg, 

41 logger=self.logger, 

42 ack_policy=self.ack_policy, 

43 extra_options=self.extra_options, 

44 context=context, 

45 ) 

46 

47 

48class _AcknowledgementMiddleware(BaseMiddleware): 

49 def __init__( 

50 self, 

51 msg: Any | None, 

52 /, 

53 *, 

54 logger: "LoggerState", 

55 context: "ContextRepo", 

56 extra_options: dict[str, Any], 

57 # can't be created with AckPolicy.MANUAL 

58 ack_policy: AckPolicy, 

59 ) -> None: 

60 super().__init__(msg, context=context) 

61 

62 self.ack_policy = ack_policy 

63 self.extra_options = extra_options 

64 self.logger = logger 

65 

66 self.message: StreamMessage[Any] | None = None 

67 

68 async def consume_scope( 

69 self, 

70 call_next: "AsyncFuncAny", 

71 msg: "StreamMessage[Any]", 

72 ) -> Any: 

73 self.message = msg 

74 if self.ack_policy is AckPolicy.ACK_FIRST: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 await self.__ack() 

76 

77 return await call_next(msg) 

78 

79 async def __aexit__( 

80 self, 

81 exc_type: type[BaseException] | None = None, 

82 exc_val: BaseException | None = None, 

83 exc_tb: Optional["TracebackType"] = None, 

84 ) -> bool | None: 

85 if self.ack_policy is AckPolicy.ACK_FIRST: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true

86 return False 

87 

88 if not exc_type: 

89 await self.__ack() 

90 

91 elif isinstance(exc_val, HandlerException): 

92 if isinstance(exc_val, AckMessage): 

93 await self.__ack(**exc_val.extra_options) 

94 

95 elif isinstance(exc_val, NackMessage): 

96 await self.__nack(**exc_val.extra_options) 

97 

98 elif isinstance(exc_val, RejectMessage): # pragma: no branch 

99 await self.__reject(**exc_val.extra_options) 

100 

101 # Exception was processed and suppressed 

102 return True 

103 

104 elif self.ack_policy is AckPolicy.REJECT_ON_ERROR: 

105 await self.__reject() 

106 

107 elif self.ack_policy is AckPolicy.NACK_ON_ERROR: 

108 await self.__nack() 

109 

110 elif self.ack_policy is AckPolicy.ACK: 110 ↛ 114line 110 didn't jump to line 114 because the condition on line 110 was always true

111 await self.__ack() 

112 

113 # Exception was not processed 

114 return False 

115 

116 async def __ack(self, **exc_extra_options: Any) -> None: 

117 if self.message: 

118 try: 

119 await self.message.ack(**exc_extra_options, **self.extra_options) 

120 except Exception as er: 

121 if self.logger is not None: 

122 self.logger.log(repr(er), logging.CRITICAL, exc_info=er) 

123 

124 async def __nack(self, **exc_extra_options: Any) -> None: 

125 if self.message: 125 ↛ exitline 125 didn't return from function '__nack' because the condition on line 125 was always true

126 try: 

127 await self.message.nack(**exc_extra_options, **self.extra_options) 

128 except Exception as er: 

129 if self.logger is not None: 

130 self.logger.log(repr(er), logging.CRITICAL, exc_info=er) 

131 

132 async def __reject(self, **exc_extra_options: Any) -> None: 

133 if self.message: 

134 try: 

135 await self.message.reject(**exc_extra_options, **self.extra_options) 

136 except Exception as er: 

137 if self.logger is not None: 

138 self.logger.log(repr(er), logging.CRITICAL, exc_info=er)