Coverage for faststream / middlewares / acknowledgement / middleware.py: 91%
51 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2from typing import TYPE_CHECKING, Any, Optional
4from faststream._internal.middlewares import BaseMiddleware
5from faststream.exceptions import (
6 AckMessage,
7 HandlerException,
8 NackMessage,
9 RejectMessage,
10)
12from .config import AckPolicy
14if TYPE_CHECKING:
15 from types import TracebackType
17 from faststream._internal.basic_types import AsyncFuncAny
18 from faststream._internal.context.repository import ContextRepo
19 from faststream._internal.logger import LoggerState
20 from faststream.message import StreamMessage
23class AcknowledgementMiddleware:
24 def __init__(
25 self,
26 logger: "LoggerState",
27 ack_policy: "AckPolicy",
28 extra_options: dict[str, Any],
29 ) -> None:
30 self.ack_policy = ack_policy
31 self.extra_options = extra_options
32 self.logger = logger
34 def __call__(
35 self,
36 msg: Any | None,
37 context: "ContextRepo",
38 ) -> "_AcknowledgementMiddleware":
39 return _AcknowledgementMiddleware(
40 msg,
41 logger=self.logger,
42 ack_policy=self.ack_policy,
43 extra_options=self.extra_options,
44 context=context,
45 )
48class _AcknowledgementMiddleware(BaseMiddleware):
49 def __init__(
50 self,
51 msg: Any | None,
52 /,
53 *,
54 logger: "LoggerState",
55 context: "ContextRepo",
56 extra_options: dict[str, Any],
57 # can't be created with AckPolicy.MANUAL
58 ack_policy: AckPolicy,
59 ) -> None:
60 super().__init__(msg, context=context)
62 self.ack_policy = ack_policy
63 self.extra_options = extra_options
64 self.logger = logger
66 self.message: StreamMessage[Any] | None = None
68 async def consume_scope(
69 self,
70 call_next: "AsyncFuncAny",
71 msg: "StreamMessage[Any]",
72 ) -> Any:
73 self.message = msg
74 if self.ack_policy is AckPolicy.ACK_FIRST: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 await self.__ack()
77 return await call_next(msg)
79 async def __aexit__(
80 self,
81 exc_type: type[BaseException] | None = None,
82 exc_val: BaseException | None = None,
83 exc_tb: Optional["TracebackType"] = None,
84 ) -> bool | None:
85 if self.ack_policy is AckPolicy.ACK_FIRST: 85 ↛ 86line 85 didn't jump to line 86 because the condition on line 85 was never true
86 return False
88 if not exc_type:
89 await self.__ack()
91 elif isinstance(exc_val, HandlerException):
92 if isinstance(exc_val, AckMessage):
93 await self.__ack(**exc_val.extra_options)
95 elif isinstance(exc_val, NackMessage):
96 await self.__nack(**exc_val.extra_options)
98 elif isinstance(exc_val, RejectMessage): # pragma: no branch
99 await self.__reject(**exc_val.extra_options)
101 # Exception was processed and suppressed
102 return True
104 elif self.ack_policy is AckPolicy.REJECT_ON_ERROR:
105 await self.__reject()
107 elif self.ack_policy is AckPolicy.NACK_ON_ERROR:
108 await self.__nack()
110 elif self.ack_policy is AckPolicy.ACK: 110 ↛ 114line 110 didn't jump to line 114 because the condition on line 110 was always true
111 await self.__ack()
113 # Exception was not processed
114 return False
116 async def __ack(self, **exc_extra_options: Any) -> None:
117 if self.message:
118 try:
119 await self.message.ack(**exc_extra_options, **self.extra_options)
120 except Exception as er:
121 if self.logger is not None:
122 self.logger.log(repr(er), logging.CRITICAL, exc_info=er)
124 async def __nack(self, **exc_extra_options: Any) -> None:
125 if self.message: 125 ↛ exitline 125 didn't return from function '__nack' because the condition on line 125 was always true
126 try:
127 await self.message.nack(**exc_extra_options, **self.extra_options)
128 except Exception as er:
129 if self.logger is not None:
130 self.logger.log(repr(er), logging.CRITICAL, exc_info=er)
132 async def __reject(self, **exc_extra_options: Any) -> None:
133 if self.message:
134 try:
135 await self.message.reject(**exc_extra_options, **self.extra_options)
136 except Exception as er:
137 if self.logger is not None:
138 self.logger.log(repr(er), logging.CRITICAL, exc_info=er)