Coverage for tests / cli / test_asyncapi_docs.py: 96%

53 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import json 

2from collections.abc import Callable 

3from typing import Any, TextIO 

4 

5import httpx 

6import pytest 

7import yaml 

8 

9from tests.cli import interfaces 

10from tests.marks import require_aiokafka, skip_macos, skip_windows 

11 

12json_asyncapi_doc = """ 

13{ 

14 "asyncapi": "2.6.0", 

15 "defaultContentType": "application/json", 

16 "info": { 

17 "title": "FastStream", 

18 "version": "0.1.0" 

19 }, 

20 "servers": { 

21 "development": { 

22 "url": "localhost:9092", 

23 "protocol": "kafka", 

24 "protocolVersion": "auto" 

25 } 

26 }, 

27 "channels": { 

28 "input_data:OnInputData": { 

29 "servers": [ 

30 "development" 

31 ], 

32 "bindings": { 

33 "kafka": { 

34 "topic": "input_data", 

35 "bindingVersion": "0.4.0" 

36 } 

37 }, 

38 "subscribe": { 

39 "message": { 

40 "$ref": "#/components/messages/input_data:OnInputData:Message" 

41 } 

42 } 

43 } 

44 }, 

45 "components": { 

46 "messages": { 

47 "input_data:OnInputData:Message": { 

48 "title": "input_data:OnInputData:Message", 

49 "correlationId": { 

50 "location": "$message.header#/correlation_id" 

51 }, 

52 "payload": { 

53 "$ref": "#/components/schemas/DataBasic" 

54 } 

55 } 

56 }, 

57 "schemas": { 

58 "DataBasic": { 

59 "properties": { 

60 "data": { 

61 "type": "number" 

62 } 

63 }, 

64 "required": [ 

65 "data" 

66 ], 

67 "title": "DataBasic", 

68 "type": "object" 

69 } 

70 } 

71 } 

72} 

73""" 

74 

75yaml_asyncapi_doc = """ 

76asyncapi: 2.6.0 

77defaultContentType: application/json 

78info: 

79 title: FastStream 

80 version: 0.1.0 

81 description: '' 

82servers: 

83 development: 

84 url: 'localhost:9092' 

85 protocol: kafka 

86 protocolVersion: auto 

87channels: 

88 'input_data:OnInputData': 

89 servers: 

90 - development 

91 bindings: null 

92 kafka: 

93 topic: input_data 

94 bindingVersion: 0.4.0 

95 subscribe: null 

96 message: 

97 $ref: '#/components/messages/input_data:OnInputData:Message' 

98components: 

99 messages: 

100 'input_data:OnInputData:Message': 

101 title: 'input_data:OnInputData:Message' 

102 correlationId: 

103 location: '$message.header#/correlation_id' 

104 payload: 

105 $ref: '#/components/schemas/DataBasic' 

106 schemas: 

107 DataBasic: 

108 properties: 

109 data: null 

110 title: Data 

111 type: number 

112 required: 

113 - data 

114 title: DataBasic 

115 type: object 

116""" 

117 

118 

119app_code = """ 

120from pydantic import BaseModel, Field, NonNegativeFloat 

121 

122from faststream import FastStream, Logger 

123from faststream.specification import AsyncAPI 

124from faststream.kafka import KafkaBroker 

125 

126 

127class DataBasic(BaseModel): 

128 data: NonNegativeFloat = Field( 

129 ..., examples=[0.5], description="Float data example" 

130 ) 

131 

132 

133broker = KafkaBroker("localhost:9092") 

134app = FastStream(broker, specification=AsyncAPI()) 

135 

136 

137@broker.publisher("output_data") 

138@broker.subscriber("input_data") 

139async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic: 

140 logger.info(msg) 

141 return DataBasic(data=msg.data + 1.0) 

142""" 

143 

144 

145@pytest.mark.slow() 

146@require_aiokafka 

147@skip_windows 

148@pytest.mark.parametrize( 

149 ("commands", "load_schema"), 

150 ( 

151 pytest.param( 

152 [], 

153 json.load, 

154 id="json", 

155 ), 

156 pytest.param( 

157 ["--yaml"], 

158 lambda f: yaml.load(f, Loader=yaml.BaseLoader), 

159 id="yaml", 

160 ), 

161 ), 

162) 

163def test_gen_asyncapi_for_kafka_app( 

164 commands: list[str], 

165 generate_template: interfaces.GenerateTemplateFactory, 

166 faststream_cli: interfaces.FastStreamCLIFactory, 

167 load_schema: Callable[[TextIO], Any], 

168) -> None: 

169 with ( 

170 generate_template(app_code) as app_path, 

171 faststream_cli( 

172 "faststream", 

173 "docs", 

174 "gen", 

175 f"{app_path.stem}:app", 

176 "--out", 

177 str(app_path.parent / "schema.json"), 

178 *commands, 

179 ) as cli_thread, 

180 ): 

181 cli_thread.wait_for_stderr("Your project AsyncAPI scheme") 

182 

183 assert cli_thread.process 

184 

185 schema_path = app_path.parent / "schema.json" 

186 assert schema_path.exists() 

187 

188 with schema_path.open() as f: 

189 schema = load_schema(f) 

190 

191 assert schema 

192 schema_path.unlink() 

193 

194 

195@pytest.mark.slow() 

196@skip_windows 

197def test_gen_wrong_path(faststream_cli) -> None: 

198 with faststream_cli("faststream", "docs", "gen", "non_existent:app") as cli: 

199 assert cli.wait_for_stderr("No such file or directory") 

200 

201 

202@skip_windows 

203@skip_macos # MacOS GHArunner doesn't allow to run 0.0.0.0 process 

204@require_aiokafka 

205@pytest.mark.slow() 

206@pytest.mark.flaky(reruns=3, reruns_delay=1) 

207def test_serve_asyncapi_docs_from_app( 

208 generate_template: interfaces.GenerateTemplateFactory, 

209 faststream_cli: interfaces.FastStreamCLIFactory, 

210) -> None: 

211 with ( 

212 generate_template(app_code) as app_path, 

213 faststream_cli( 

214 "faststream", "docs", "serve", "--host", "0.0.0.0", f"{app_path.stem}:app" 

215 ) as cli, 

216 ): 

217 cli.wait_for_stderr("Please, do not use it in production.") 

218 

219 try: 

220 response = httpx.get("http://0.0.0.0:8000") 

221 except Exception as e: 

222 raise RuntimeError(cli.stderr) from e 

223 

224 assert "<title>FastStream AsyncAPI</title>" in response.text 

225 assert response.status_code == 200 

226 

227 

228@skip_windows 

229@require_aiokafka 

230def test_serve_asyncapi_docs_from_app_with_reload( 

231 generate_template: interfaces.GenerateTemplateFactory, 

232 faststream_cli: interfaces.FastStreamCLIFactory, 

233) -> None: 

234 with ( 

235 generate_template(app_code) as app_path, 

236 faststream_cli( 

237 "faststream", 

238 "docs", 

239 "serve", 

240 f"{app_path.stem}:app", 

241 "--reload", 

242 ) as cli, 

243 ): 

244 assert cli.wait_for_stderr("HTTPServer running on http://localhost:8000"), ( 

245 cli.stderr 

246 ) 

247 

248 

249@skip_windows 

250@skip_macos # MacOS GHA runner doesn't allow to run 0.0.0.0 process 

251@require_aiokafka 

252@pytest.mark.slow() 

253@pytest.mark.flaky(reruns=3, reruns_delay=1) 

254@pytest.mark.parametrize( 

255 ("doc_filename", "doc"), 

256 ( 

257 pytest.param("asyncapi.json", json_asyncapi_doc, id="json_schema"), 

258 pytest.param("asyncapi.yaml", yaml_asyncapi_doc, id="yaml_schema"), 

259 ), 

260) 

261def test_serve_asyncapi_docs_from_file( 

262 doc_filename: str, 

263 doc: str, 

264 generate_template: interfaces.GenerateTemplateFactory, 

265 faststream_cli: interfaces.FastStreamCLIFactory, 

266) -> None: 

267 with ( 

268 generate_template(doc, filename=doc_filename) as doc_path, 

269 faststream_cli( 

270 "faststream", "docs", "serve", "--host", "0.0.0.0", str(doc_path) 

271 ) as cli, 

272 ): 

273 cli.wait_for_stderr("Please, do not use it in production.") 

274 

275 try: 

276 response = httpx.get("http://0.0.0.0:8000") 

277 except Exception as e: 

278 raise RuntimeError(cli.stderr) from e 

279 

280 assert "<title>FastStream AsyncAPI</title>" in response.text 

281 assert response.status_code == 200