Coverage for tests / cli / conftest.py: 93%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import os 

2import select 

3import signal 

4import subprocess 

5import threading 

6import time 

7from collections.abc import Generator 

8from contextlib import contextmanager, suppress 

9from pathlib import Path 

10from textwrap import dedent 

11 

12import pytest 

13 

14from faststream import FastStream 

15from faststream._internal._compat import IS_WINDOWS 

16from tests.cli import interfaces 

17 

18 

19@pytest.fixture() 

20def broker(): 

21 # separate import from e2e tests 

22 from faststream.rabbit import RabbitBroker 

23 

24 return RabbitBroker() 

25 

26 

27@pytest.fixture() 

28def app_without_logger(broker) -> FastStream: 

29 return FastStream(broker, logger=None) 

30 

31 

32@pytest.fixture() 

33def app_without_broker() -> FastStream: 

34 return FastStream() 

35 

36 

37@pytest.fixture() 

38def app(broker) -> FastStream: 

39 return FastStream(broker) 

40 

41 

42@pytest.fixture() 

43def faststream_tmp_path(tmp_path: "Path"): 

44 faststream_tmp = tmp_path / "faststream_templates" 

45 faststream_tmp.mkdir(exist_ok=True) 

46 return faststream_tmp 

47 

48 

49@pytest.fixture() 

50def generate_template( 

51 faststream_tmp_path: "Path", 

52) -> interfaces.GenerateTemplateFactory: 

53 @contextmanager 

54 def factory( 

55 code: str, 

56 filename: str = "temp_app.py", 

57 ) -> Generator["Path", None, None]: 

58 file_path: Path = faststream_tmp_path / filename 

59 cleaned_code = dedent(code).strip() 

60 

61 file_path.write_text(cleaned_code, encoding="utf-8") 

62 

63 try: 

64 yield file_path 

65 finally: 

66 file_path.unlink(missing_ok=True) 

67 

68 return factory 

69 

70 

71class CLIThread: 

72 def __init__( 

73 self, 

74 command: tuple[str, ...], 

75 env: dict[str, str], 

76 ) -> None: 

77 self.process = subprocess.Popen( 

78 command, 

79 stderr=subprocess.PIPE, 

80 text=True, 

81 shell=False, 

82 env=env, 

83 ) 

84 self.running = True 

85 self.started = False 

86 

87 self.stderr = "" 

88 

89 self.__std_poll_thread = threading.Thread(target=self._poll_std) 

90 self.__std_poll_thread.start() 

91 

92 def _poll_std(self) -> None: 

93 assert self.process.stderr 

94 

95 if IS_WINDOWS: 

96 return 

97 

98 while self.running: 

99 rlist, _, _ = select.select([self.process.stderr], [], [], 0.1) 

100 if rlist: 

101 self.started = True 

102 

103 if line := self.process.stderr.readline(): 

104 self.stderr += line.strip() 

105 

106 else: 

107 break 

108 

109 elif self.process.poll() is not None: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 break 

111 

112 def wait_for_stderr(self, message: str, timeout: float = 2.0) -> bool: 

113 assert self.process.stderr 

114 

115 if message in self.stderr: 

116 return True 

117 

118 expiration_time = time.time() + timeout 

119 

120 while time.time() < expiration_time: 

121 time.sleep(0.1) 

122 if message in self.stderr: 

123 return True 

124 

125 if self.process.returncode is not None: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 return message in self.process.stderr.read() 

127 

128 return False 

129 

130 def wait(self, timeout: float) -> None: 

131 self.process.wait(timeout) 

132 

133 def signint(self) -> None: 

134 if IS_WINDOWS: 

135 self.process.terminate() 

136 else: 

137 self.process.send_signal(signal.SIGINT) 

138 

139 def stop(self) -> None: 

140 self.process.terminate() 

141 

142 self.running = False 

143 with suppress(Exception): 

144 self.__std_poll_thread.join() 

145 

146 try: 

147 self.wait(5) 

148 

149 except subprocess.TimeoutExpired: 

150 self.process.kill() 

151 

152 

153@pytest.fixture() 

154def faststream_cli( 

155 faststream_tmp_path: Path, 

156) -> interfaces.FastStreamCLIFactory: 

157 @contextmanager 

158 def cli_factory( 

159 *cmd: str, 

160 wait_time: float = 2.0, 

161 extra_env: dict[str, str] | None = None, 

162 ) -> Generator[CLIThread, None, None]: 

163 env = ( 

164 os.environ.copy() 

165 | { 

166 "PATH": f"{faststream_tmp_path}:{os.environ['PATH']}", 

167 "PYTHONPATH": str(faststream_tmp_path), 

168 } 

169 | (extra_env or {}) 

170 ) 

171 

172 cli = CLIThread(cmd, env) 

173 

174 wait_for_startup(cli, wait_time) 

175 

176 try: 

177 yield cli 

178 finally: 

179 cli.stop() 

180 

181 return cli_factory 

182 

183 

184def wait_for_startup( 

185 cli: CLIThread, 

186 timeout: float = 10, 

187 check_interval: float = 0.1, 

188) -> None: 

189 start_time = time.time() 

190 

191 while time.time() - start_time < timeout: 

192 if cli.started: 

193 return 

194 

195 if cli.process.poll() is not None: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 return 

197 

198 time.sleep(check_interval) 

199 

200 return