Coverage for tests / brokers / nats / test_test_client.py: 97%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
3import pytest
5from faststream import BaseMiddleware
6from faststream.nats import (
7 ConsumerConfig,
8 JStream,
9 PullSub,
10)
11from faststream.nats.testing import FakeProducer
12from tests.brokers.base.testclient import BrokerTestclientTestcase
14from .basic import NatsMemoryTestcaseConfig
17@pytest.mark.nats()
18@pytest.mark.asyncio()
19class TestTestclient(NatsMemoryTestcaseConfig, BrokerTestclientTestcase):
20 @pytest.mark.asyncio()
21 async def test_stream_publish(
22 self,
23 queue: str,
24 ) -> None:
25 pub_broker = self.get_broker(apply_types=False)
27 @pub_broker.subscriber(queue, stream="test")
28 async def m(msg) -> None: ...
30 async with self.patch_broker(pub_broker) as br:
31 await br.publish("Hi!", queue, stream="test")
32 m.mock.assert_called_once_with("Hi!")
34 @pytest.mark.asyncio()
35 async def test_wrong_stream_publish(
36 self,
37 queue: str,
38 ) -> None:
39 pub_broker = self.get_broker(apply_types=False)
41 @pub_broker.subscriber(queue)
42 async def m(msg) -> None: ...
44 async with self.patch_broker(pub_broker) as br:
45 await br.publish("Hi!", queue, stream="test")
46 assert not m.mock.called
48 @pytest.mark.connected()
49 async def test_with_real_testclient(
50 self,
51 queue: str,
52 ) -> None:
53 event = asyncio.Event()
55 broker = self.get_broker()
57 @broker.subscriber(queue)
58 def subscriber(m) -> None:
59 event.set()
61 async with self.patch_broker(broker, with_real=True) as br:
62 await asyncio.wait(
63 (
64 asyncio.create_task(br.publish("hello", queue)),
65 asyncio.create_task(event.wait()),
66 ),
67 timeout=3,
68 )
70 assert event.is_set()
72 @pytest.mark.connected()
73 async def test_inbox_prefix_with_real(
74 self,
75 queue: str,
76 ) -> None:
77 broker = self.get_broker(inbox_prefix="test")
79 async with self.patch_broker(broker, with_real=True) as br:
80 assert br._connection._inbox_prefix == b"test"
81 assert "test" in str(br._connection.new_inbox())
83 async def test_respect_middleware(self, queue: str) -> None:
84 routes = []
86 class Middleware(BaseMiddleware):
87 async def on_receive(self) -> None:
88 routes.append(None)
89 return await super().on_receive()
91 broker = self.get_broker(middlewares=(Middleware,))
93 @broker.subscriber(queue)
94 async def h1(m) -> None: ...
96 @broker.subscriber(queue + "1")
97 async def h2(m) -> None: ...
99 async with self.patch_broker(broker) as br:
100 await br.publish("", queue)
101 await br.publish("", queue + "1")
103 assert len(routes) == 2
105 @pytest.mark.connected()
106 async def test_real_respect_middleware(self, queue: str) -> None:
107 routes = []
109 class Middleware(BaseMiddleware):
110 async def on_receive(self) -> None:
111 routes.append(None)
112 return await super().on_receive()
114 broker = self.get_broker(middlewares=(Middleware,))
116 @broker.subscriber(queue)
117 async def h1(m) -> None: ...
119 @broker.subscriber(queue + "1")
120 async def h2(m) -> None: ...
122 async with self.patch_broker(broker, with_real=True) as br:
123 await br.publish("", queue)
124 await br.publish("", queue + "1")
125 await h1.wait_call(3)
126 await h2.wait_call(3)
128 assert len(routes) == 2
130 async def test_js_subscriber_mock(
131 self,
132 queue: str,
133 stream: JStream,
134 ) -> None:
135 broker = self.get_broker()
137 @broker.subscriber(queue, stream=stream)
138 async def m(msg) -> None:
139 pass
141 async with self.patch_broker(broker) as br:
142 await br.publish("hello", queue, stream=stream.name)
143 m.mock.assert_called_once_with("hello")
145 async def test_js_publisher_mock(
146 self,
147 queue: str,
148 stream: JStream,
149 ) -> None:
150 broker = self.get_broker()
152 publisher = broker.publisher(queue + "resp")
154 @publisher
155 @broker.subscriber(queue, stream=stream)
156 async def m(msg) -> str:
157 return "response"
159 async with self.patch_broker(broker) as br:
160 await br.publish("hello", queue, stream=stream.name)
161 publisher.mock.assert_called_with("response")
163 async def test_any_subject_routing(self) -> None:
164 broker = self.get_broker()
166 @broker.subscriber("test.*.subj.*")
167 def subscriber(msg) -> None: ...
169 async with self.patch_broker(broker) as br:
170 await br.publish("hello", "test.a.subj.b")
171 subscriber.mock.assert_called_once_with("hello")
173 async def test_ending_subject_routing(self) -> None:
174 broker = self.get_broker()
176 @broker.subscriber("test.>")
177 def subscriber(msg) -> None: ...
179 async with self.patch_broker(broker) as br:
180 await br.publish("hello", "test.a.subj.b")
181 subscriber.mock.assert_called_once_with("hello")
183 async def test_mixed_subject_routing(self) -> None:
184 broker = self.get_broker()
186 @broker.subscriber("*.*.subj.>")
187 def subscriber(msg) -> None: ...
189 async with self.patch_broker(broker) as br:
190 await br.publish("hello", "test.a.subj.b.c")
191 subscriber.mock.assert_called_once_with("hello")
193 async def test_consume_pull(
194 self,
195 queue: str,
196 stream: JStream,
197 ) -> None:
198 broker = self.get_broker()
200 @broker.subscriber(queue, stream=stream, pull_sub=PullSub(1))
201 def subscriber(m) -> None: ...
203 async with self.patch_broker(broker) as br:
204 await br.publish("hello", queue)
205 subscriber.mock.assert_called_once_with("hello")
207 async def test_consume_batch(
208 self,
209 queue: str,
210 stream: JStream,
211 ) -> None:
212 broker = self.get_broker()
214 @broker.subscriber(
215 queue,
216 stream=stream,
217 pull_sub=PullSub(1, batch=True),
218 )
219 def subscriber(m) -> None:
220 pass
222 async with self.patch_broker(broker) as br:
223 await br.publish("hello", queue)
224 subscriber.mock.assert_called_once_with(["hello"])
226 async def test_consume_with_subject_filter(self, queue: str) -> None:
227 broker = self.get_broker()
229 @broker.subscriber(
230 config=ConsumerConfig(filter_subjects=[f"{queue}.a"]),
231 stream=JStream(queue, subjects=[f"{queue}.*"]),
232 )
233 def subscriber(m) -> None:
234 pass
236 async with self.patch_broker(broker) as br:
237 await br.publish(1, f"{queue}.b")
238 await br.publish(2, f"{queue}.a")
239 subscriber.mock.assert_called_once_with(2)
241 @pytest.mark.connected()
242 async def test_broker_gets_patched_attrs_within_cm(self) -> None:
243 await super().test_broker_gets_patched_attrs_within_cm(FakeProducer)
245 @pytest.mark.connected()
246 async def test_broker_with_real_doesnt_get_patched(self) -> None:
247 await super().test_broker_with_real_doesnt_get_patched()
249 @pytest.mark.connected()
250 async def test_broker_with_real_patches_publishers_and_subscribers(
251 self,
252 queue: str,
253 ) -> None:
254 await super().test_broker_with_real_patches_publishers_and_subscribers(queue)
256 @pytest.mark.xfail(reason="https://github.com/ag2ai/faststream/issues/2513")
257 async def test_publisher_without_destination(self) -> None:
258 """Fixes https://github.com/ag2ai/faststream/issues/2513."""
259 broker = self.get_broker()
261 # use two publishers to check that we don't have conflicts
262 publisher = broker.publisher(subject="")
263 another_publisher = broker.publisher(subject="")
265 async with self.patch_broker(broker):
266 await publisher.publish(None, subject="new-key")
267 publisher.mock.assert_called_once()
269 await another_publisher.publish(None, subject="new-key")
270 another_publisher.mock.assert_called_once()