Coverage for tests / brokers / redis / test_test_client.py: 96%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2 

3import pytest 

4 

5from faststream import BaseMiddleware 

6from faststream.redis import ListSub, StreamSub 

7from faststream.redis.testing import FakeProducer 

8from tests.brokers.base.testclient import BrokerTestclientTestcase 

9 

10from .basic import RedisMemoryTestcaseConfig 

11 

12 

13@pytest.mark.redis() 

14@pytest.mark.asyncio() 

15class TestTestclient(RedisMemoryTestcaseConfig, BrokerTestclientTestcase): 

16 @pytest.mark.connected() 

17 async def test_with_real_testclient( 

18 self, 

19 queue: str, 

20 ) -> None: 

21 event = asyncio.Event() 

22 

23 broker = self.get_broker() 

24 

25 @broker.subscriber(queue) 

26 def subscriber(m) -> None: 

27 event.set() 

28 

29 async with self.patch_broker(broker, with_real=True) as br: 

30 await asyncio.wait( 

31 ( 

32 asyncio.create_task(br.publish("hello", queue)), 

33 asyncio.create_task(event.wait()), 

34 ), 

35 timeout=3, 

36 ) 

37 

38 assert event.is_set() 

39 

40 async def test_respect_middleware(self, queue: str) -> None: 

41 routes = [] 

42 

43 class Middleware(BaseMiddleware): 

44 async def on_receive(self) -> None: 

45 routes.append(None) 

46 return await super().on_receive() 

47 

48 broker = self.get_broker(middlewares=(Middleware,)) 

49 

50 @broker.subscriber(queue) 

51 async def h1(m) -> None: ... 

52 

53 @broker.subscriber(queue + "1") 

54 async def h2(m) -> None: ... 

55 

56 async with self.patch_broker(broker) as br: 

57 await br.publish("", queue) 

58 await br.publish("", queue + "1") 

59 

60 assert len(routes) == 2 

61 

62 @pytest.mark.connected() 

63 async def test_real_respect_middleware(self, queue: str) -> None: 

64 routes = [] 

65 

66 class Middleware(BaseMiddleware): 

67 async def on_receive(self) -> None: 

68 routes.append(None) 

69 return await super().on_receive() 

70 

71 broker = self.get_broker(middlewares=(Middleware,)) 

72 

73 @broker.subscriber(queue) 

74 async def h1(m) -> None: ... 

75 

76 @broker.subscriber(queue + "1") 

77 async def h2(m) -> None: ... 

78 

79 async with self.patch_broker(broker, with_real=True) as br: 

80 await br.publish("", queue) 

81 await br.publish("", queue + "1") 

82 await h1.wait_call(3) 

83 await h2.wait_call(3) 

84 

85 assert len(routes) == 2 

86 

87 async def test_pub_sub_pattern(self) -> None: 

88 broker = self.get_broker() 

89 

90 @broker.subscriber("test.{name}") 

91 async def handler(msg): 

92 return msg 

93 

94 async with self.patch_broker(broker) as br: 

95 assert await (await br.request(1, "test.name.useless")).decode() == 1 

96 handler.mock.assert_called_once_with(1) 

97 

98 async def test_list( 

99 self, 

100 queue: str, 

101 ) -> None: 

102 broker = self.get_broker() 

103 

104 @broker.subscriber(list=queue) 

105 async def handler(msg): 

106 return msg 

107 

108 async with self.patch_broker(broker) as br: 

109 assert await (await br.request(1, list=queue)).decode() == 1 

110 handler.mock.assert_called_once_with(1) 

111 

112 async def test_batch_pub_by_default_pub( 

113 self, 

114 queue: str, 

115 ) -> None: 

116 broker = self.get_broker() 

117 

118 @broker.subscriber(list=ListSub(queue, batch=True)) 

119 async def m(msg) -> None: 

120 pass 

121 

122 async with self.patch_broker(broker) as br: 

123 await br.publish("hello", list=queue) 

124 m.mock.assert_called_once_with(["hello"]) 

125 

126 async def test_batch_pub_by_pub_batch( 

127 self, 

128 queue: str, 

129 ) -> None: 

130 broker = self.get_broker() 

131 

132 @broker.subscriber(list=ListSub(queue, batch=True)) 

133 async def m(msg) -> None: 

134 pass 

135 

136 async with self.patch_broker(broker) as br: 

137 await br.publish_batch("hello", list=queue) 

138 m.mock.assert_called_once_with(["hello"]) 

139 

140 async def test_batch_publisher_mock( 

141 self, 

142 queue: str, 

143 ) -> None: 

144 broker = self.get_broker() 

145 

146 batch_list = ListSub(queue + "1", batch=True) 

147 publisher = broker.publisher(list=batch_list) 

148 

149 @publisher 

150 @broker.subscriber(queue) 

151 async def m(msg): 

152 return 1, 2, 3 

153 

154 async with self.patch_broker(broker) as br: 

155 await br.publish("hello", queue) 

156 m.mock.assert_called_once_with("hello") 

157 publisher.mock.assert_called_once_with([1, 2, 3]) 

158 

159 async def test_stream( 

160 self, 

161 queue: str, 

162 ) -> None: 

163 broker = self.get_broker() 

164 

165 @broker.subscriber(stream=queue) 

166 async def handler(msg): 

167 return msg 

168 

169 async with self.patch_broker(broker) as br: 

170 assert await (await br.request(1, stream=queue)).decode() == 1 

171 handler.mock.assert_called_once_with(1) 

172 

173 async def test_stream_batch_pub_by_default_pub( 

174 self, 

175 queue: str, 

176 ) -> None: 

177 broker = self.get_broker() 

178 

179 @broker.subscriber(stream=StreamSub(queue, batch=True)) 

180 async def m(msg) -> None: 

181 pass 

182 

183 async with self.patch_broker(broker) as br: 

184 await br.publish("hello", stream=queue) 

185 m.mock.assert_called_once_with(["hello"]) 

186 

187 async def test_stream_publisher( 

188 self, 

189 queue: str, 

190 ) -> None: 

191 broker = self.get_broker() 

192 

193 batch_stream = StreamSub(queue + "1") 

194 publisher = broker.publisher(stream=batch_stream) 

195 

196 @publisher 

197 @broker.subscriber(queue) 

198 async def m(msg): 

199 return 1, 2, 3 

200 

201 async with self.patch_broker(broker) as br: 

202 await br.publish("hello", queue) 

203 m.mock.assert_called_once_with("hello") 

204 publisher.mock.assert_called_once_with([1, 2, 3]) 

205 

206 async def test_publish_to_none(self) -> None: 

207 broker = self.get_broker() 

208 

209 async with self.patch_broker(broker) as br: 

210 with pytest.raises(ValueError): # noqa: PT011 

211 await br.publish("hello") 

212 

213 @pytest.mark.connected() 

214 async def test_broker_gets_patched_attrs_within_cm(self) -> None: 

215 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer) 

216 

217 @pytest.mark.connected() 

218 async def test_broker_with_real_doesnt_get_patched(self) -> None: 

219 await super().test_broker_with_real_doesnt_get_patched() 

220 

221 @pytest.mark.connected() 

222 async def test_broker_with_real_patches_publishers_and_subscribers( 

223 self, 

224 queue: str, 

225 ) -> None: 

226 await super().test_broker_with_real_patches_publishers_and_subscribers(queue) 

227 

228 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513") 

229 async def test_publisher_without_destination(self) -> None: 

230 """Fixes https://github.com/ag2ai/faststream/issues/2513.""" 

231 broker = self.get_broker() 

232 

233 # use two publishers to check that we don't have conflicts 

234 channel_publisher = broker.publisher(channel="") 

235 another_channel_publisher = broker.publisher(channel="") 

236 

237 list_publisher = broker.publisher(list="") 

238 stream_publisher = broker.publisher(stream="") 

239 

240 async with self.patch_broker(broker): 

241 await channel_publisher.publish(None, channel="new-key") 

242 channel_publisher.mock.assert_called_once() 

243 

244 await another_channel_publisher.publish(None, channel="new-key") 

245 another_channel_publisher.mock.assert_called_once() 

246 

247 await list_publisher.publish(None, list="new-key") 

248 list_publisher.mock.assert_called_once() 

249 

250 await stream_publisher.publish(None, stream="new-key") 

251 stream_publisher.mock.assert_called_once()