Coverage for faststream / _internal / cli / docs.py: 8%

52 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import json 

2import sys 

3import warnings 

4from contextlib import suppress 

5from pathlib import Path 

6from pprint import pformat 

7from typing import TYPE_CHECKING, cast 

8 

9import typer 

10from pydantic import ValidationError 

11 

12from faststream._internal._compat import json_dumps, model_parse 

13from faststream._internal.cli.utils.imports import import_from_string 

14from faststream.exceptions import INSTALL_WATCHFILES, INSTALL_YAML, SCHEMA_NOT_SUPPORTED 

15from faststream.specification.asyncapi.site import serve_app 

16from faststream.specification.asyncapi.v2_6_0.schema import ( 

17 ApplicationSchema as SchemaV2_6, 

18) 

19from faststream.specification.asyncapi.v3_0_0.schema import ( 

20 ApplicationSchema as SchemaV3, 

21) 

22 

23from .dto import RunArgs 

24from .options import ( 

25 APP_ARGUMENT, 

26 APP_DIR_OPTION, 

27 FACTORY_OPTION, 

28 RELOAD_EXTENSIONS_OPTION, 

29 RELOAD_FLAG, 

30) 

31 

32if TYPE_CHECKING: 

33 from collections.abc import Sequence 

34 

35 from faststream.specification.base import SpecificationFactory 

36 

37 

38docs_app = typer.Typer(pretty_exceptions_short=True) 

39 

40 

41@docs_app.command(name="serve") 

42def serve( 

43 docs: str = typer.Argument( 

44 ..., 

45 help="[python_module:FastStream] or [asyncapi.json/.yaml] - path to your application or documentation.", 

46 show_default=False, 

47 ), 

48 host: str = typer.Option( 

49 "localhost", 

50 help="Documentation hosting address.", 

51 ), 

52 port: int = typer.Option( 

53 8000, 

54 help="Documentation hosting port.", 

55 ), 

56 app_dir: str = APP_DIR_OPTION, 

57 is_factory: bool = FACTORY_OPTION, 

58 reload: bool = RELOAD_FLAG, 

59 watch_extensions: list[str] = RELOAD_EXTENSIONS_OPTION, 

60) -> None: 

61 """Serve project AsyncAPI schema.""" 

62 if ":" in docs: 

63 if app_dir: # pragma: no branch 

64 sys.path.insert(0, app_dir) 

65 

66 module, _ = import_from_string(docs, is_factory=is_factory) 

67 

68 module_parent = module.parent 

69 extra_extensions: Sequence[str] = watch_extensions 

70 

71 else: 

72 module_parent = Path.cwd() 

73 schema_filepath = module_parent / docs 

74 extra_extensions = (schema_filepath.suffix, *watch_extensions) 

75 

76 run_args = RunArgs( 

77 app=docs, 

78 extra_options={"host": host, "port": port}, 

79 is_factory=is_factory, 

80 ) 

81 

82 if reload: 

83 try: 

84 from faststream._internal.cli.supervisors.watchfiles import WatchReloader 

85 

86 except ImportError: 

87 warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1) 

88 _parse_and_serve(run_args) 

89 

90 else: 

91 WatchReloader( 

92 target=_parse_and_serve, 

93 args=run_args, 

94 reload_dirs=(str(module_parent),), 

95 extra_extensions=extra_extensions, 

96 ).run() 

97 

98 else: 

99 _parse_and_serve(run_args) 

100 

101 

102@docs_app.command(name="gen") 

103def gen( 

104 app: str = APP_ARGUMENT, 

105 yaml: bool = typer.Option( 

106 False, 

107 "-y", 

108 "--yaml", 

109 help="Generate `asyncapi.yaml` schema.", 

110 ), 

111 out: str | None = typer.Option( 

112 None, 

113 "-o", 

114 "--out", 

115 help="Output filename.", 

116 show_default="asyncapi.json/.yaml", 

117 ), 

118 debug: bool = typer.Option( 

119 False, 

120 "-d", 

121 "--debug", 

122 help="Do not save generated schema to file. Print it instead.", 

123 ), 

124 app_dir: str = APP_DIR_OPTION, 

125 is_factory: bool = FACTORY_OPTION, 

126) -> None: 

127 """Generate project AsyncAPI schema.""" 

128 if app_dir: # pragma: no branch 

129 sys.path.insert(0, app_dir) 

130 

131 _, app_obj = import_from_string(app, is_factory=is_factory) 

132 schema_factory = cast( 

133 "SpecificationFactory | None", 

134 getattr(app_obj, "schema", None), 

135 ) 

136 if not schema_factory: 

137 msg = f"{app_obj} doesn't have `schema` attribute" 

138 raise ValueError(msg) 

139 

140 raw_schema = schema_factory.to_specification() 

141 

142 if yaml: 

143 try: 

144 schema = raw_schema.to_yaml() 

145 except ImportError as e: # pragma: no cover 

146 typer.echo(INSTALL_YAML, err=True) 

147 raise typer.Exit(1) from e 

148 

149 filename = out or "asyncapi.yaml" 

150 

151 if not debug: 

152 Path(filename).write_text(schema, encoding="utf-8") 

153 else: 

154 schema = raw_schema.to_jsonable() 

155 filename = out or "asyncapi.json" 

156 

157 if not debug: 

158 with Path(filename).open("w", encoding="utf-8") as f: 

159 json.dump(schema, f, indent=2) 

160 

161 else: 

162 schema = pformat(schema) 

163 

164 if debug: 

165 typer.echo("Generated schema:\n") 

166 typer.echo(schema, color=True) 

167 

168 else: 

169 typer.echo(f"Your project AsyncAPI scheme was placed to `{filename}`") 

170 

171 

172def _parse_and_serve(args: RunArgs) -> None: 

173 if ":" in args.app: 

174 _, app_obj = import_from_string(args.app, is_factory=args.is_factory) 

175 schema_factory = cast( 

176 "SpecificationFactory | None", 

177 getattr(app_obj, "schema", None), 

178 ) 

179 if not schema_factory: 

180 msg = f"{app_obj} doesn't have `schema` attribute" 

181 raise ValueError(msg) 

182 raw_schema = schema_factory.to_specification() 

183 

184 else: 

185 schema_filepath = Path.cwd() / args.app 

186 

187 if schema_filepath.suffix == ".json": 

188 data = schema_filepath.read_bytes() 

189 

190 elif schema_filepath.suffix in {".yaml", ".yml"}: 

191 try: 

192 import yaml 

193 except ImportError as e: # pragma: no cover 

194 typer.echo(INSTALL_YAML, err=True) 

195 raise typer.Exit(1) from e 

196 

197 with schema_filepath.open("r") as f: 

198 schema = yaml.safe_load(f) 

199 

200 data = json_dumps(schema) 

201 

202 else: 

203 msg = f"Unknown extension given - {args.app}; Please provide app in format [python_module:Specification] or [asyncapi.yaml/.json] - path to your application or documentation" 

204 raise ValueError(msg) 

205 

206 for schema in (SchemaV3, SchemaV2_6): 

207 with suppress(ValidationError): 

208 raw_schema = model_parse(schema, data) 

209 break 

210 else: 

211 typer.echo(SCHEMA_NOT_SUPPORTED.format(schema_filename=args.app), err=True) 

212 raise typer.Exit(1) 

213 

214 serve_app( 

215 raw_schema, 

216 cast("str", args.extra_options.get("host", "localhost")), 

217 cast("int", args.extra_options.get("port", 8000)), 

218 )