Coverage for tests / brokers / base / parser.py: 99%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from unittest.mock import MagicMock 

3 

4import anyio 

5import pytest 

6 

7from faststream.message.message import StreamMessage 

8 

9from .basic import BaseTestcaseConfig 

10 

11 

12@pytest.mark.asyncio() 

13class LocalCustomParserTestcase(BaseTestcaseConfig): 

14 async def test_local_parser( 

15 self, 

16 mock: MagicMock, 

17 event: asyncio.Event, 

18 queue: str, 

19 ) -> None: 

20 broker = self.get_broker() 

21 

22 async def custom_parser(msg, original): 

23 msg = await original(msg) 

24 mock(msg.body) 

25 return msg 

26 

27 args, kwargs = self.get_subscriber_params(queue, parser=custom_parser) 

28 

29 @broker.subscriber(*args, **kwargs) 

30 async def handle(m) -> None: 

31 event.set() 

32 

33 async with self.patch_broker(broker) as br: 

34 await br.start() 

35 

36 await asyncio.wait( 

37 ( 

38 asyncio.create_task(br.publish(b"hello", queue)), 

39 asyncio.create_task(event.wait()), 

40 ), 

41 timeout=self.timeout, 

42 ) 

43 

44 assert event.is_set() 

45 mock.assert_called_once_with(b"hello") 

46 

47 async def test_local_sync_decoder( 

48 self, 

49 mock: MagicMock, 

50 event: asyncio.Event, 

51 queue: str, 

52 ) -> None: 

53 broker = self.get_broker() 

54 

55 def custom_decoder(msg): 

56 mock(msg.body) 

57 return msg 

58 

59 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder) 

60 

61 @broker.subscriber(*args, **kwargs) 

62 async def handle(m) -> None: 

63 event.set() 

64 

65 async with self.patch_broker(broker) as br: 

66 await br.start() 

67 

68 await asyncio.wait( 

69 ( 

70 asyncio.create_task(br.publish(b"hello", queue)), 

71 asyncio.create_task(event.wait()), 

72 ), 

73 timeout=self.timeout, 

74 ) 

75 

76 assert event.is_set() 

77 mock.assert_called_once_with(b"hello") 

78 

79 async def test_global_sync_decoder( 

80 self, 

81 mock: MagicMock, 

82 event: asyncio.Event, 

83 queue: str, 

84 ) -> None: 

85 def custom_decoder(msg): 

86 mock(msg.body) 

87 return msg 

88 

89 broker = self.get_broker(decoder=custom_decoder) 

90 

91 args, kwargs = self.get_subscriber_params(queue) 

92 

93 @broker.subscriber(*args, **kwargs) 

94 async def handle(m) -> None: 

95 event.set() 

96 

97 async with self.patch_broker(broker) as br: 

98 await br.start() 

99 

100 await asyncio.wait( 

101 ( 

102 asyncio.create_task(br.publish(b"hello", queue)), 

103 asyncio.create_task(event.wait()), 

104 ), 

105 timeout=self.timeout, 

106 ) 

107 

108 assert event.is_set() 

109 mock.assert_called_once_with(b"hello") 

110 

111 async def test_local_parser_no_share_between_subscribers( 

112 self, 

113 mock: MagicMock, 

114 queue: str, 

115 ) -> None: 

116 event, event2 = asyncio.Event(), asyncio.Event() 

117 broker = self.get_broker() 

118 

119 async def custom_parser(msg, original): 

120 msg = await original(msg) 

121 mock(msg.body) 

122 return msg 

123 

124 args, kwargs = self.get_subscriber_params(queue, parser=custom_parser) 

125 args2, kwargs2 = self.get_subscriber_params(queue + "1") 

126 

127 @broker.subscriber(*args, **kwargs) 

128 @broker.subscriber(*args2, **kwargs2) 

129 async def handle(m) -> None: 

130 if event.is_set(): 

131 event2.set() 

132 else: 

133 event.set() 

134 

135 async with self.patch_broker(broker) as br: 

136 await br.start() 

137 

138 await asyncio.wait( 

139 ( 

140 asyncio.create_task(br.publish(b"hello", queue)), 

141 asyncio.create_task(br.publish(b"hello", queue + "1")), 

142 asyncio.create_task(event.wait()), 

143 asyncio.create_task(event2.wait()), 

144 ), 

145 timeout=self.timeout, 

146 ) 

147 

148 assert event.is_set() 

149 assert event2.is_set() 

150 mock.assert_called_once_with(b"hello") 

151 

152 async def test_local_parser_no_share_between_handlers( 

153 self, 

154 mock: MagicMock, 

155 queue: str, 

156 ) -> None: 

157 event, event2 = asyncio.Event(), asyncio.Event() 

158 

159 broker = self.get_broker() 

160 

161 args, kwargs = self.get_subscriber_params(queue) 

162 sub = broker.subscriber(*args, **kwargs) 

163 

164 @sub(filter=lambda m: m.content_type == "application/json") 

165 async def handle(m) -> None: 

166 event.set() 

167 

168 async def custom_parser(msg, original): 

169 msg = await original(msg) 

170 mock(msg.body) 

171 return msg 

172 

173 @sub(parser=custom_parser) 

174 async def handle2(m) -> None: 

175 event2.set() 

176 

177 async with self.patch_broker(broker) as br: 

178 await br.start() 

179 

180 await asyncio.wait( 

181 ( 

182 asyncio.create_task(br.publish({"msg": "hello"}, queue)), 

183 asyncio.create_task(br.publish(b"hello", queue)), 

184 asyncio.create_task(event.wait()), 

185 asyncio.create_task(event2.wait()), 

186 ), 

187 timeout=self.timeout, 

188 ) 

189 

190 assert event.is_set() 

191 assert event2.is_set() 

192 assert mock.call_count == 1 

193 

194 @pytest.mark.connected() 

195 async def test_iterator_respect_decoder( 

196 self, 

197 mock: MagicMock, 

198 queue: str, 

199 ) -> None: 

200 """Fixes https://github.com/ag2ai/faststream/issues/2554.""" 

201 start_event, consumed_event, stopped_event = ( 

202 asyncio.Event(), 

203 asyncio.Event(), 

204 asyncio.Event(), 

205 ) 

206 

207 broker = self.get_broker() 

208 

209 async def custom_decoder(msg, original): 

210 mock() 

211 consumed_event.set() 

212 return await original(msg) 

213 

214 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder) 

215 sub = broker.subscriber(*args, **kwargs) 

216 

217 async def iter_messages() -> None: 

218 await sub.start() 

219 start_event.set() 

220 

221 async for m in sub: 221 ↛ 227line 221 didn't jump to line 227 because the loop on line 221 didn't complete

222 assert not mock.called 

223 msg = await m.decode() 

224 mock.assert_called_once() 

225 break 

226 

227 await sub.stop() 

228 stopped_event.set() 

229 return msg 

230 

231 async with broker: 

232 t = asyncio.create_task(iter_messages()) 

233 

234 with anyio.move_on_after(self.timeout): 

235 await start_event.wait() 

236 await broker.publish(b"hello", queue) 

237 await consumed_event.wait() 

238 await stopped_event.wait() 

239 

240 await t 

241 data = t.result() 

242 assert data == b"hello", data 

243 

244 @pytest.mark.connected() 

245 async def test_get_one_respect_decoder( 

246 self, 

247 queue: str, 

248 event: asyncio.Event, 

249 mock: MagicMock, 

250 ) -> None: 

251 """Fixes https://github.com/ag2ai/faststream/issues/2554.""" 

252 broker = self.get_broker() 

253 

254 async def custom_decoder(msg, original): 

255 mock() 

256 return await original(msg) 

257 

258 args, kwargs = self.get_subscriber_params(queue, decoder=custom_decoder) 

259 sub = broker.subscriber(*args, **kwargs) 

260 

261 async def get_msg() -> StreamMessage: 

262 await sub.start() 

263 event.set() 

264 msg = await sub.get_one(timeout=self.timeout) 

265 await sub.stop() 

266 return msg 

267 

268 async with broker: 

269 task = asyncio.create_task(get_msg()) 

270 

271 with anyio.move_on_after(self.timeout): 

272 await event.wait() 

273 await broker.publish(b"hello", queue) 

274 

275 await task 

276 msg = task.result() 

277 

278 assert not mock.called, mock.call_count 

279 await msg.decode() 

280 mock.assert_called_once() 

281 

282 

283class CustomParserTestcase(LocalCustomParserTestcase): 

284 async def test_global_parser( 

285 self, 

286 mock: MagicMock, 

287 event: asyncio.Event, 

288 queue: str, 

289 ) -> None: 

290 async def custom_parser(msg, original): 

291 msg = await original(msg) 

292 mock(msg.body) 

293 return msg 

294 

295 broker = self.get_broker(parser=custom_parser) 

296 

297 args, kwargs = self.get_subscriber_params(queue) 

298 

299 @broker.subscriber(*args, **kwargs) 

300 async def handle(m) -> None: 

301 event.set() 

302 

303 async with self.patch_broker(broker) as br: 

304 await br.start() 

305 

306 await asyncio.wait( 

307 ( 

308 asyncio.create_task(br.publish(b"hello", queue)), 

309 asyncio.create_task(event.wait()), 

310 ), 

311 timeout=self.timeout, 

312 ) 

313 

314 assert event.is_set() 

315 mock.assert_called_once_with(b"hello")