Coverage for tests / brokers / redis / test_test_client.py: 96%
137 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
3import pytest
5from faststream import BaseMiddleware
6from faststream.redis import ListSub, StreamSub
7from faststream.redis.testing import FakeProducer
8from tests.brokers.base.testclient import BrokerTestclientTestcase
10from .basic import RedisMemoryTestcaseConfig
13@pytest.mark.redis()
14@pytest.mark.asyncio()
15class TestTestclient(RedisMemoryTestcaseConfig, BrokerTestclientTestcase):
16 @pytest.mark.connected()
17 async def test_with_real_testclient(
18 self,
19 queue: str,
20 ) -> None:
21 event = asyncio.Event()
23 broker = self.get_broker()
25 @broker.subscriber(queue)
26 def subscriber(m) -> None:
27 event.set()
29 async with self.patch_broker(broker, with_real=True) as br:
30 await asyncio.wait(
31 (
32 asyncio.create_task(br.publish("hello", queue)),
33 asyncio.create_task(event.wait()),
34 ),
35 timeout=3,
36 )
38 assert event.is_set()
40 async def test_respect_middleware(self, queue: str) -> None:
41 routes = []
43 class Middleware(BaseMiddleware):
44 async def on_receive(self) -> None:
45 routes.append(None)
46 return await super().on_receive()
48 broker = self.get_broker(middlewares=(Middleware,))
50 @broker.subscriber(queue)
51 async def h1(m) -> None: ...
53 @broker.subscriber(queue + "1")
54 async def h2(m) -> None: ...
56 async with self.patch_broker(broker) as br:
57 await br.publish("", queue)
58 await br.publish("", queue + "1")
60 assert len(routes) == 2
62 @pytest.mark.connected()
63 async def test_real_respect_middleware(self, queue: str) -> None:
64 routes = []
66 class Middleware(BaseMiddleware):
67 async def on_receive(self) -> None:
68 routes.append(None)
69 return await super().on_receive()
71 broker = self.get_broker(middlewares=(Middleware,))
73 @broker.subscriber(queue)
74 async def h1(m) -> None: ...
76 @broker.subscriber(queue + "1")
77 async def h2(m) -> None: ...
79 async with self.patch_broker(broker, with_real=True) as br:
80 await br.publish("", queue)
81 await br.publish("", queue + "1")
82 await h1.wait_call(3)
83 await h2.wait_call(3)
85 assert len(routes) == 2
87 async def test_pub_sub_pattern(self) -> None:
88 broker = self.get_broker()
90 @broker.subscriber("test.{name}")
91 async def handler(msg):
92 return msg
94 async with self.patch_broker(broker) as br:
95 assert await (await br.request(1, "test.name.useless")).decode() == 1
96 handler.mock.assert_called_once_with(1)
98 async def test_list(
99 self,
100 queue: str,
101 ) -> None:
102 broker = self.get_broker()
104 @broker.subscriber(list=queue)
105 async def handler(msg):
106 return msg
108 async with self.patch_broker(broker) as br:
109 assert await (await br.request(1, list=queue)).decode() == 1
110 handler.mock.assert_called_once_with(1)
112 async def test_batch_pub_by_default_pub(
113 self,
114 queue: str,
115 ) -> None:
116 broker = self.get_broker()
118 @broker.subscriber(list=ListSub(queue, batch=True))
119 async def m(msg) -> None:
120 pass
122 async with self.patch_broker(broker) as br:
123 await br.publish("hello", list=queue)
124 m.mock.assert_called_once_with(["hello"])
126 async def test_batch_pub_by_pub_batch(
127 self,
128 queue: str,
129 ) -> None:
130 broker = self.get_broker()
132 @broker.subscriber(list=ListSub(queue, batch=True))
133 async def m(msg) -> None:
134 pass
136 async with self.patch_broker(broker) as br:
137 await br.publish_batch("hello", list=queue)
138 m.mock.assert_called_once_with(["hello"])
140 async def test_batch_publisher_mock(
141 self,
142 queue: str,
143 ) -> None:
144 broker = self.get_broker()
146 batch_list = ListSub(queue + "1", batch=True)
147 publisher = broker.publisher(list=batch_list)
149 @publisher
150 @broker.subscriber(queue)
151 async def m(msg):
152 return 1, 2, 3
154 async with self.patch_broker(broker) as br:
155 await br.publish("hello", queue)
156 m.mock.assert_called_once_with("hello")
157 publisher.mock.assert_called_once_with([1, 2, 3])
159 async def test_stream(
160 self,
161 queue: str,
162 ) -> None:
163 broker = self.get_broker()
165 @broker.subscriber(stream=queue)
166 async def handler(msg):
167 return msg
169 async with self.patch_broker(broker) as br:
170 assert await (await br.request(1, stream=queue)).decode() == 1
171 handler.mock.assert_called_once_with(1)
173 async def test_stream_batch_pub_by_default_pub(
174 self,
175 queue: str,
176 ) -> None:
177 broker = self.get_broker()
179 @broker.subscriber(stream=StreamSub(queue, batch=True))
180 async def m(msg) -> None:
181 pass
183 async with self.patch_broker(broker) as br:
184 await br.publish("hello", stream=queue)
185 m.mock.assert_called_once_with(["hello"])
187 async def test_stream_publisher(
188 self,
189 queue: str,
190 ) -> None:
191 broker = self.get_broker()
193 batch_stream = StreamSub(queue + "1")
194 publisher = broker.publisher(stream=batch_stream)
196 @publisher
197 @broker.subscriber(queue)
198 async def m(msg):
199 return 1, 2, 3
201 async with self.patch_broker(broker) as br:
202 await br.publish("hello", queue)
203 m.mock.assert_called_once_with("hello")
204 publisher.mock.assert_called_once_with([1, 2, 3])
206 async def test_publish_to_none(self) -> None:
207 broker = self.get_broker()
209 async with self.patch_broker(broker) as br:
210 with pytest.raises(ValueError): # noqa: PT011
211 await br.publish("hello")
213 @pytest.mark.connected()
214 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
215 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
217 @pytest.mark.connected()
218 async def test_broker_with_real_doesnt_get_patched(self) -> None:
219 await super().test_broker_with_real_doesnt_get_patched()
221 @pytest.mark.connected()
222 async def test_broker_with_real_patches_publishers_and_subscribers(
223 self,
224 queue: str,
225 ) -> None:
226 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
228 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513")
229 async def test_publisher_without_destination(self) -> None:
230 """Fixes https://github.com/ag2ai/faststream/issues/2513."""
231 broker = self.get_broker()
233 # use two publishers to check that we don't have conflicts
234 channel_publisher = broker.publisher(channel="")
235 another_channel_publisher = broker.publisher(channel="")
237 list_publisher = broker.publisher(list="")
238 stream_publisher = broker.publisher(stream="")
240 async with self.patch_broker(broker):
241 await channel_publisher.publish(None, channel="new-key")
242 channel_publisher.mock.assert_called_once()
244 await another_channel_publisher.publish(None, channel="new-key")
245 another_channel_publisher.mock.assert_called_once()
247 await list_publisher.publish(None, list="new-key")
248 list_publisher.mock.assert_called_once()
250 await stream_publisher.publish(None, stream="new-key")
251 stream_publisher.mock.assert_called_once()