Coverage for tests / cli / conftest.py: 93%
89 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import os
2import select
3import signal
4import subprocess
5import threading
6import time
7from collections.abc import Generator
8from contextlib import contextmanager, suppress
9from pathlib import Path
10from textwrap import dedent
12import pytest
14from faststream import FastStream
15from faststream._internal._compat import IS_WINDOWS
16from tests.cli import interfaces
19@pytest.fixture()
20def broker():
21 # separate import from e2e tests
22 from faststream.rabbit import RabbitBroker
24 return RabbitBroker()
27@pytest.fixture()
28def app_without_logger(broker) -> FastStream:
29 return FastStream(broker, logger=None)
32@pytest.fixture()
33def app_without_broker() -> FastStream:
34 return FastStream()
37@pytest.fixture()
38def app(broker) -> FastStream:
39 return FastStream(broker)
42@pytest.fixture()
43def faststream_tmp_path(tmp_path: "Path"):
44 faststream_tmp = tmp_path / "faststream_templates"
45 faststream_tmp.mkdir(exist_ok=True)
46 return faststream_tmp
49@pytest.fixture()
50def generate_template(
51 faststream_tmp_path: "Path",
52) -> interfaces.GenerateTemplateFactory:
53 @contextmanager
54 def factory(
55 code: str,
56 filename: str = "temp_app.py",
57 ) -> Generator["Path", None, None]:
58 file_path: Path = faststream_tmp_path / filename
59 cleaned_code = dedent(code).strip()
61 file_path.write_text(cleaned_code, encoding="utf-8")
63 try:
64 yield file_path
65 finally:
66 file_path.unlink(missing_ok=True)
68 return factory
71class CLIThread:
72 def __init__(
73 self,
74 command: tuple[str, ...],
75 env: dict[str, str],
76 ) -> None:
77 self.process = subprocess.Popen(
78 command,
79 stderr=subprocess.PIPE,
80 text=True,
81 shell=False,
82 env=env,
83 )
84 self.running = True
85 self.started = False
87 self.stderr = ""
89 self.__std_poll_thread = threading.Thread(target=self._poll_std)
90 self.__std_poll_thread.start()
92 def _poll_std(self) -> None:
93 assert self.process.stderr
95 if IS_WINDOWS:
96 return
98 while self.running:
99 rlist, _, _ = select.select([self.process.stderr], [], [], 0.1)
100 if rlist:
101 self.started = True
103 if line := self.process.stderr.readline():
104 self.stderr += line.strip()
106 else:
107 break
109 elif self.process.poll() is not None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 break
112 def wait_for_stderr(self, message: str, timeout: float = 2.0) -> bool:
113 assert self.process.stderr
115 if message in self.stderr:
116 return True
118 expiration_time = time.time() + timeout
120 while time.time() < expiration_time:
121 time.sleep(0.1)
122 if message in self.stderr:
123 return True
125 if self.process.returncode is not None: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 return message in self.process.stderr.read()
128 return False
130 def wait(self, timeout: float) -> None:
131 self.process.wait(timeout)
133 def signint(self) -> None:
134 if IS_WINDOWS:
135 self.process.terminate()
136 else:
137 self.process.send_signal(signal.SIGINT)
139 def stop(self) -> None:
140 self.process.terminate()
142 self.running = False
143 with suppress(Exception):
144 self.__std_poll_thread.join()
146 try:
147 self.wait(5)
149 except subprocess.TimeoutExpired:
150 self.process.kill()
153@pytest.fixture()
154def faststream_cli(
155 faststream_tmp_path: Path,
156) -> interfaces.FastStreamCLIFactory:
157 @contextmanager
158 def cli_factory(
159 *cmd: str,
160 wait_time: float = 2.0,
161 extra_env: dict[str, str] | None = None,
162 ) -> Generator[CLIThread, None, None]:
163 env = (
164 os.environ.copy()
165 | {
166 "PATH": f"{faststream_tmp_path}:{os.environ['PATH']}",
167 "PYTHONPATH": str(faststream_tmp_path),
168 }
169 | (extra_env or {})
170 )
172 cli = CLIThread(cmd, env)
174 wait_for_startup(cli, wait_time)
176 try:
177 yield cli
178 finally:
179 cli.stop()
181 return cli_factory
184def wait_for_startup(
185 cli: CLIThread,
186 timeout: float = 10,
187 check_interval: float = 0.1,
188) -> None:
189 start_time = time.time()
191 while time.time() - start_time < timeout:
192 if cli.started:
193 return
195 if cli.process.poll() is not None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 return
198 time.sleep(check_interval)
200 return