Coverage for faststream / _internal / cli / main.py: 57%
87 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import logging
2import os
3import sys
4import warnings
5from contextlib import suppress
6from pathlib import Path
7from typing import TYPE_CHECKING, Any, Literal, cast
9import anyio
10import typer
12from faststream import FastStream
13from faststream.__about__ import __version__
14from faststream._internal._compat import IS_WINDOWS, json_loads
15from faststream._internal.application import Application
16from faststream.asgi import AsgiFastStream
17from faststream.exceptions import INSTALL_WATCHFILES, SetupError, StartupValidationError
19from .docs import docs_app
20from .dto import RunArgs
21from .options import (
22 APP_ARGUMENT,
23 APP_DIR_OPTION,
24 FACTORY_OPTION,
25 RELOAD_EXTENSIONS_OPTION,
26 RELOAD_FLAG,
27)
28from .utils.imports import import_from_string
29from .utils.logs import (
30 LogFiles,
31 LogLevels,
32 get_log_level,
33 set_log_config,
34 set_log_level,
35)
36from .utils.parser import parse_cli_args
38if TYPE_CHECKING:
39 from faststream._internal.broker import BrokerUsecase
41rich_mode = os.getenv("FASTSTREAM_CLI_RICH_MODE", "rich")
42if rich_mode == "none": 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true
43 rich_markup_mode: Literal["markdown", "rich"] | None = None
44elif rich_mode in {"md", "markdown"}: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true
45 rich_markup_mode = "markdown"
46elif rich_mode == "rich": 46 ↛ 49line 46 didn't jump to line 49 because the condition on line 46 was always true
47 rich_markup_mode = "rich"
48else:
49 msg = f"Invalid rich mode: {rich_mode}"
50 raise ValueError(msg)
52cli = typer.Typer(pretty_exceptions_short=True, rich_markup_mode=rich_markup_mode)
53cli.add_typer(docs_app, name="docs", help="Documentations commands")
56def version_callback(version: bool) -> None:
57 """Callback function for displaying version information."""
58 if version:
59 import platform
61 typer.echo(
62 f"Running FastStream {__version__} with {platform.python_implementation()} "
63 f"{platform.python_version()} on {platform.system()}",
64 )
66 raise typer.Exit
69def loop_callback(value: str) -> str:
70 # validate loop string in callback for more informative error
71 if value != "auto": 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 import_from_string(value)
73 return value
76@cli.callback()
77def main(
78 version: bool | None = typer.Option(
79 False,
80 "-v",
81 "--version",
82 callback=version_callback,
83 is_eager=True,
84 help="Show current platform, python and FastStream version.",
85 ),
86) -> None:
87 """Generate, run and manage FastStream apps to greater development experience."""
90@cli.command(
91 context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
92)
93def run(
94 ctx: typer.Context,
95 app: str = APP_ARGUMENT,
96 workers: int = typer.Option(
97 1,
98 "-w",
99 "--workers",
100 show_default=False,
101 help="Run [workers] applications with process spawning.",
102 envvar="FASTSTREAM_WORKERS",
103 ),
104 app_dir: str = APP_DIR_OPTION,
105 is_factory: bool = FACTORY_OPTION,
106 reload: bool = RELOAD_FLAG,
107 watch_extensions: list[str] = RELOAD_EXTENSIONS_OPTION,
108 loop: str = typer.Option(
109 "auto",
110 "--loop",
111 callback=loop_callback,
112 help=("Event loop factory implementation."),
113 envvar="FASTSTREAM_LOOP",
114 ),
115 log_level: LogLevels = typer.Option(
116 LogLevels.notset,
117 "-l",
118 "--log-level",
119 case_sensitive=False,
120 help="Set selected level for FastStream and brokers logger objects.",
121 envvar="FASTSTREAM_LOG_LEVEL",
122 show_default=False,
123 ),
124 log_config: Path | None = typer.Option(
125 None,
126 "--log-config",
127 help=(
128 "Set file to configure logging. Support "
129 f"{', '.join(f'`{x.value}`' for x in LogFiles)} extensions." # noqa: B008
130 ),
131 show_default=False,
132 ),
133) -> None:
134 """Run [MODULE:APP] FastStream application."""
135 if watch_extensions and not reload: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 typer.echo(
137 "Extra reload extensions has no effect without `--reload` flag."
138 "\nProbably, you forgot it?",
139 )
141 app, extra = parse_cli_args(app, *ctx.args)
142 casted_log_level = get_log_level(log_level)
144 if app_dir: # pragma: no branch
145 sys.path.insert(0, app_dir)
147 # Should be imported after sys.path changes
148 module_path, app_obj = import_from_string(app, is_factory=is_factory)
149 app_obj = cast("Application", app_obj)
151 if reload and workers > 1: 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 msg = "You can't use reload option with multiprocessing"
153 raise SetupError(msg)
155 run_args = RunArgs(
156 app,
157 extra_options={"worker_id": None, **extra},
158 is_factory=is_factory,
159 log_config=log_config,
160 log_level=casted_log_level,
161 loop=loop,
162 )
164 if reload: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true
165 try:
166 from faststream._internal.cli.supervisors.watchfiles import WatchReloader
167 except ImportError:
168 warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1)
169 _run(run_args)
171 else:
172 reload_dirs = []
173 if module_path:
174 reload_dirs.append(str(module_path))
175 if app_dir != ".":
176 reload_dirs.append(app_dir)
178 WatchReloader(
179 target=_run,
180 args=run_args,
181 reload_dirs=reload_dirs,
182 extra_extensions=watch_extensions,
183 ).run()
185 elif workers > 1: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 if isinstance(app_obj, FastStream):
187 from faststream._internal.cli.supervisors.multiprocess import Multiprocess
189 run_args.app_level = logging.DEBUG
191 Multiprocess(
192 target=_run,
193 args=run_args,
194 workers=workers,
195 ).run()
197 elif isinstance(app_obj, AsgiFastStream):
198 from faststream._internal.cli.supervisors.asgi_multiprocess import (
199 ASGIMultiprocess,
200 )
202 ASGIMultiprocess(
203 target=app,
204 args=run_args,
205 workers=workers,
206 ).run()
208 else:
209 msg = f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
210 raise typer.BadParameter(msg)
212 else:
213 _run_imported_app(app_obj, args=run_args)
216def _run(args: RunArgs) -> None:
217 """Runs the specified application."""
218 _, app_obj = import_from_string(args.app, is_factory=args.is_factory)
219 app_obj = cast("Application", app_obj)
220 _run_imported_app(app_obj, args=args)
223def _run_imported_app(app_obj: "Application", args: RunArgs) -> None:
224 if not isinstance(app_obj, Application): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 msg = f'Imported object "{app_obj}" must be "Application" type.'
226 raise typer.BadParameter(
227 msg,
228 )
230 if args.log_level > 0:
231 set_log_level(args.log_level, app_obj)
233 if args.log_config is not None: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 set_log_config(args.log_config)
236 backend_options = {}
237 if args.loop != "auto":
238 _, loop_factory = import_from_string(args.loop)
239 backend_options["loop_factory"] = loop_factory
241 elif not IS_WINDOWS: # pragma: no cover
242 with suppress(ImportError):
243 import uvloop
245 backend_options["loop_factory"] = uvloop.new_event_loop
247 try:
248 anyio.run(
249 app_obj.run,
250 args.app_level,
251 args.extra_options,
252 backend_options=backend_options,
253 )
255 except StartupValidationError as startup_exc:
256 from faststream._internal.cli.utils.errors import draw_startup_errors
258 draw_startup_errors(startup_exc)
259 sys.exit(1)
262@cli.command(
263 context_settings={"allow_extra_args": True, "ignore_unknown_options": True},
264)
265def publish(
266 ctx: typer.Context,
267 app: str = APP_ARGUMENT,
268 app_dir: str = APP_DIR_OPTION,
269 message: str = typer.Argument(
270 ...,
271 help="JSON Message string to publish.",
272 show_default=False,
273 ),
274 rpc: bool = typer.Option(
275 False,
276 help="Enable RPC mode and system output.",
277 ),
278 is_factory: bool = FACTORY_OPTION,
279) -> None:
280 """Publish a message using the specified broker in a FastStream application.
282 This command publishes a message to a broker configured in a FastStream app instance.
283 It supports various brokers and can handle extra arguments specific to each broker type.
284 These are parsed and passed to the broker's publish method.
285 """
286 app, extra = parse_cli_args(app, *ctx.args)
288 if app_dir: # pragma: no branch
289 sys.path.insert(0, app_dir)
291 publish_extra: dict[str, Any] = extra.copy()
292 if "timeout" in publish_extra:
293 publish_extra["timeout"] = float(publish_extra["timeout"])
295 try:
296 _, app_obj = import_from_string(app, is_factory=is_factory)
298 assert isinstance(app_obj, Application), app_obj
300 if not app_obj.broker:
301 msg = "Broker instance not found in the app."
302 raise ValueError(msg)
304 result = anyio.run(publish_message, app_obj.broker, rpc, message, publish_extra)
306 if rpc:
307 typer.echo(result)
309 except Exception as e:
310 typer.echo(f"Publish error: {e}")
311 sys.exit(1)
314async def publish_message(
315 broker: "BrokerUsecase[Any, Any]",
316 rpc: bool,
317 message: str,
318 extra: dict[str, Any],
319) -> Any:
320 with suppress(Exception):
321 message = json_loads(message)
323 try:
324 async with broker:
325 if rpc:
326 return await broker.request(message, **extra) # type: ignore[call-arg]
327 return await broker.publish(message, **extra) # type: ignore[call-arg]
329 except Exception as e:
330 typer.echo(f"Error when broker was publishing: {e!r}")
331 sys.exit(1)