Coverage for faststream / _internal / cli / main.py: 57%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import logging 

2import os 

3import sys 

4import warnings 

5from contextlib import suppress 

6from pathlib import Path 

7from typing import TYPE_CHECKING, Any, Literal, cast 

8 

9import anyio 

10import typer 

11 

12from faststream import FastStream 

13from faststream.__about__ import __version__ 

14from faststream._internal._compat import IS_WINDOWS, json_loads 

15from faststream._internal.application import Application 

16from faststream.asgi import AsgiFastStream 

17from faststream.exceptions import INSTALL_WATCHFILES, SetupError, StartupValidationError 

18 

19from .docs import docs_app 

20from .dto import RunArgs 

21from .options import ( 

22 APP_ARGUMENT, 

23 APP_DIR_OPTION, 

24 FACTORY_OPTION, 

25 RELOAD_EXTENSIONS_OPTION, 

26 RELOAD_FLAG, 

27) 

28from .utils.imports import import_from_string 

29from .utils.logs import ( 

30 LogFiles, 

31 LogLevels, 

32 get_log_level, 

33 set_log_config, 

34 set_log_level, 

35) 

36from .utils.parser import parse_cli_args 

37 

38if TYPE_CHECKING: 

39 from faststream._internal.broker import BrokerUsecase 

40 

41rich_mode = os.getenv("FASTSTREAM_CLI_RICH_MODE", "rich") 

42if rich_mode == "none": 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true

43 rich_markup_mode: Literal["markdown", "rich"] | None = None 

44elif rich_mode in {"md", "markdown"}: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true

45 rich_markup_mode = "markdown" 

46elif rich_mode == "rich": 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true

47 rich_markup_mode = "rich" 

48else: 

49 msg = f"Invalid rich mode: {rich_mode}" 

50 raise ValueError(msg) 

51 

52cli = typer.Typer(pretty_exceptions_short=True, rich_markup_mode=rich_markup_mode) 

53cli.add_typer(docs_app, name="docs", help="Documentations commands") 

54 

55 

56def version_callback(version: bool) -> None: 

57 """Callback function for displaying version information.""" 

58 if version: 

59 import platform 

60 

61 typer.echo( 

62 f"Running FastStream {__version__} with {platform.python_implementation()} " 

63 f"{platform.python_version()} on {platform.system()}", 

64 ) 

65 

66 raise typer.Exit 

67 

68 

69def loop_callback(value: str) -> str: 

70 # validate loop string in callback for more informative error 

71 if value != "auto": 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 import_from_string(value) 

73 return value 

74 

75 

76@cli.callback() 

77def main( 

78 version: bool | None = typer.Option( 

79 False, 

80 "-v", 

81 "--version", 

82 callback=version_callback, 

83 is_eager=True, 

84 help="Show current platform, python and FastStream version.", 

85 ), 

86) -> None: 

87 """Generate, run and manage FastStream apps to greater development experience.""" 

88 

89 

90@cli.command( 

91 context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, 

92) 

93def run( 

94 ctx: typer.Context, 

95 app: str = APP_ARGUMENT, 

96 workers: int = typer.Option( 

97 1, 

98 "-w", 

99 "--workers", 

100 show_default=False, 

101 help="Run [workers] applications with process spawning.", 

102 envvar="FASTSTREAM_WORKERS", 

103 ), 

104 app_dir: str = APP_DIR_OPTION, 

105 is_factory: bool = FACTORY_OPTION, 

106 reload: bool = RELOAD_FLAG, 

107 watch_extensions: list[str] = RELOAD_EXTENSIONS_OPTION, 

108 loop: str = typer.Option( 

109 "auto", 

110 "--loop", 

111 callback=loop_callback, 

112 help=("Event loop factory implementation."), 

113 envvar="FASTSTREAM_LOOP", 

114 ), 

115 log_level: LogLevels = typer.Option( 

116 LogLevels.notset, 

117 "-l", 

118 "--log-level", 

119 case_sensitive=False, 

120 help="Set selected level for FastStream and brokers logger objects.", 

121 envvar="FASTSTREAM_LOG_LEVEL", 

122 show_default=False, 

123 ), 

124 log_config: Path | None = typer.Option( 

125 None, 

126 "--log-config", 

127 help=( 

128 "Set file to configure logging. Support " 

129 f"{', '.join(f'`{x.value}`' for x in LogFiles)} extensions." # noqa: B008 

130 ), 

131 show_default=False, 

132 ), 

133) -> None: 

134 """Run [MODULE:APP] FastStream application.""" 

135 if watch_extensions and not reload: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 typer.echo( 

137 "Extra reload extensions has no effect without `--reload` flag." 

138 "\nProbably, you forgot it?", 

139 ) 

140 

141 app, extra = parse_cli_args(app, *ctx.args) 

142 casted_log_level = get_log_level(log_level) 

143 

144 if app_dir: # pragma: no branch 

145 sys.path.insert(0, app_dir) 

146 

147 # Should be imported after sys.path changes 

148 module_path, app_obj = import_from_string(app, is_factory=is_factory) 

149 app_obj = cast("Application", app_obj) 

150 

151 if reload and workers > 1: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 msg = "You can't use reload option with multiprocessing" 

153 raise SetupError(msg) 

154 

155 run_args = RunArgs( 

156 app, 

157 extra_options={"worker_id": None, **extra}, 

158 is_factory=is_factory, 

159 log_config=log_config, 

160 log_level=casted_log_level, 

161 loop=loop, 

162 ) 

163 

164 if reload: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true

165 try: 

166 from faststream._internal.cli.supervisors.watchfiles import WatchReloader 

167 except ImportError: 

168 warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1) 

169 _run(run_args) 

170 

171 else: 

172 reload_dirs = [] 

173 if module_path: 

174 reload_dirs.append(str(module_path)) 

175 if app_dir != ".": 

176 reload_dirs.append(app_dir) 

177 

178 WatchReloader( 

179 target=_run, 

180 args=run_args, 

181 reload_dirs=reload_dirs, 

182 extra_extensions=watch_extensions, 

183 ).run() 

184 

185 elif workers > 1: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 if isinstance(app_obj, FastStream): 

187 from faststream._internal.cli.supervisors.multiprocess import Multiprocess 

188 

189 run_args.app_level = logging.DEBUG 

190 

191 Multiprocess( 

192 target=_run, 

193 args=run_args, 

194 workers=workers, 

195 ).run() 

196 

197 elif isinstance(app_obj, AsgiFastStream): 

198 from faststream._internal.cli.supervisors.asgi_multiprocess import ( 

199 ASGIMultiprocess, 

200 ) 

201 

202 ASGIMultiprocess( 

203 target=app, 

204 args=run_args, 

205 workers=workers, 

206 ).run() 

207 

208 else: 

209 msg = f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}." 

210 raise typer.BadParameter(msg) 

211 

212 else: 

213 _run_imported_app(app_obj, args=run_args) 

214 

215 

216def _run(args: RunArgs) -> None: 

217 """Runs the specified application.""" 

218 _, app_obj = import_from_string(args.app, is_factory=args.is_factory) 

219 app_obj = cast("Application", app_obj) 

220 _run_imported_app(app_obj, args=args) 

221 

222 

223def _run_imported_app(app_obj: "Application", args: RunArgs) -> None: 

224 if not isinstance(app_obj, Application): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 msg = f'Imported object "{app_obj}" must be "Application" type.' 

226 raise typer.BadParameter( 

227 msg, 

228 ) 

229 

230 if args.log_level > 0: 

231 set_log_level(args.log_level, app_obj) 

232 

233 if args.log_config is not None: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 set_log_config(args.log_config) 

235 

236 backend_options = {} 

237 if args.loop != "auto": 

238 _, loop_factory = import_from_string(args.loop) 

239 backend_options["loop_factory"] = loop_factory 

240 

241 elif not IS_WINDOWS: # pragma: no cover 

242 with suppress(ImportError): 

243 import uvloop 

244 

245 backend_options["loop_factory"] = uvloop.new_event_loop 

246 

247 try: 

248 anyio.run( 

249 app_obj.run, 

250 args.app_level, 

251 args.extra_options, 

252 backend_options=backend_options, 

253 ) 

254 

255 except StartupValidationError as startup_exc: 

256 from faststream._internal.cli.utils.errors import draw_startup_errors 

257 

258 draw_startup_errors(startup_exc) 

259 sys.exit(1) 

260 

261 

262@cli.command( 

263 context_settings={"allow_extra_args": True, "ignore_unknown_options": True}, 

264) 

265def publish( 

266 ctx: typer.Context, 

267 app: str = APP_ARGUMENT, 

268 app_dir: str = APP_DIR_OPTION, 

269 message: str = typer.Argument( 

270 ..., 

271 help="JSON Message string to publish.", 

272 show_default=False, 

273 ), 

274 rpc: bool = typer.Option( 

275 False, 

276 help="Enable RPC mode and system output.", 

277 ), 

278 is_factory: bool = FACTORY_OPTION, 

279) -> None: 

280 """Publish a message using the specified broker in a FastStream application. 

281 

282 This command publishes a message to a broker configured in a FastStream app instance. 

283 It supports various brokers and can handle extra arguments specific to each broker type. 

284 These are parsed and passed to the broker's publish method. 

285 """ 

286 app, extra = parse_cli_args(app, *ctx.args) 

287 

288 if app_dir: # pragma: no branch 

289 sys.path.insert(0, app_dir) 

290 

291 publish_extra: dict[str, Any] = extra.copy() 

292 if "timeout" in publish_extra: 

293 publish_extra["timeout"] = float(publish_extra["timeout"]) 

294 

295 try: 

296 _, app_obj = import_from_string(app, is_factory=is_factory) 

297 

298 assert isinstance(app_obj, Application), app_obj 

299 

300 if not app_obj.broker: 

301 msg = "Broker instance not found in the app." 

302 raise ValueError(msg) 

303 

304 result = anyio.run(publish_message, app_obj.broker, rpc, message, publish_extra) 

305 

306 if rpc: 

307 typer.echo(result) 

308 

309 except Exception as e: 

310 typer.echo(f"Publish error: {e}") 

311 sys.exit(1) 

312 

313 

314async def publish_message( 

315 broker: "BrokerUsecase[Any, Any]", 

316 rpc: bool, 

317 message: str, 

318 extra: dict[str, Any], 

319) -> Any: 

320 with suppress(Exception): 

321 message = json_loads(message) 

322 

323 try: 

324 async with broker: 

325 if rpc: 

326 return await broker.request(message, **extra) # type: ignore[call-arg] 

327 return await broker.publish(message, **extra) # type: ignore[call-arg] 

328 

329 except Exception as e: 

330 typer.echo(f"Error when broker was publishing: {e!r}") 

331 sys.exit(1)