Coverage for tests / cli / test_asyncapi_docs.py: 96%
53 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import json
2from collections.abc import Callable
3from typing import Any, TextIO
5import httpx
6import pytest
7import yaml
9from tests.cli import interfaces
10from tests.marks import require_aiokafka, skip_macos, skip_windows
12json_asyncapi_doc = """
13{
14 "asyncapi": "2.6.0",
15 "defaultContentType": "application/json",
16 "info": {
17 "title": "FastStream",
18 "version": "0.1.0"
19 },
20 "servers": {
21 "development": {
22 "url": "localhost:9092",
23 "protocol": "kafka",
24 "protocolVersion": "auto"
25 }
26 },
27 "channels": {
28 "input_data:OnInputData": {
29 "servers": [
30 "development"
31 ],
32 "bindings": {
33 "kafka": {
34 "topic": "input_data",
35 "bindingVersion": "0.4.0"
36 }
37 },
38 "subscribe": {
39 "message": {
40 "$ref": "#/components/messages/input_data:OnInputData:Message"
41 }
42 }
43 }
44 },
45 "components": {
46 "messages": {
47 "input_data:OnInputData:Message": {
48 "title": "input_data:OnInputData:Message",
49 "correlationId": {
50 "location": "$message.header#/correlation_id"
51 },
52 "payload": {
53 "$ref": "#/components/schemas/DataBasic"
54 }
55 }
56 },
57 "schemas": {
58 "DataBasic": {
59 "properties": {
60 "data": {
61 "type": "number"
62 }
63 },
64 "required": [
65 "data"
66 ],
67 "title": "DataBasic",
68 "type": "object"
69 }
70 }
71 }
72}
73"""
75yaml_asyncapi_doc = """
76asyncapi: 2.6.0
77defaultContentType: application/json
78info:
79 title: FastStream
80 version: 0.1.0
81 description: ''
82servers:
83 development:
84 url: 'localhost:9092'
85 protocol: kafka
86 protocolVersion: auto
87channels:
88 'input_data:OnInputData':
89 servers:
90 - development
91 bindings: null
92 kafka:
93 topic: input_data
94 bindingVersion: 0.4.0
95 subscribe: null
96 message:
97 $ref: '#/components/messages/input_data:OnInputData:Message'
98components:
99 messages:
100 'input_data:OnInputData:Message':
101 title: 'input_data:OnInputData:Message'
102 correlationId:
103 location: '$message.header#/correlation_id'
104 payload:
105 $ref: '#/components/schemas/DataBasic'
106 schemas:
107 DataBasic:
108 properties:
109 data: null
110 title: Data
111 type: number
112 required:
113 - data
114 title: DataBasic
115 type: object
116"""
119app_code = """
120from pydantic import BaseModel, Field, NonNegativeFloat
122from faststream import FastStream, Logger
123from faststream.specification import AsyncAPI
124from faststream.kafka import KafkaBroker
127class DataBasic(BaseModel):
128 data: NonNegativeFloat = Field(
129 ..., examples=[0.5], description="Float data example"
130 )
133broker = KafkaBroker("localhost:9092")
134app = FastStream(broker, specification=AsyncAPI())
137@broker.publisher("output_data")
138@broker.subscriber("input_data")
139async def on_input_data(msg: DataBasic, logger: Logger) -> DataBasic:
140 logger.info(msg)
141 return DataBasic(data=msg.data + 1.0)
142"""
145@pytest.mark.slow()
146@require_aiokafka
147@skip_windows
148@pytest.mark.parametrize(
149 ("commands", "load_schema"),
150 (
151 pytest.param(
152 [],
153 json.load,
154 id="json",
155 ),
156 pytest.param(
157 ["--yaml"],
158 lambda f: yaml.load(f, Loader=yaml.BaseLoader),
159 id="yaml",
160 ),
161 ),
162)
163def test_gen_asyncapi_for_kafka_app(
164 commands: list[str],
165 generate_template: interfaces.GenerateTemplateFactory,
166 faststream_cli: interfaces.FastStreamCLIFactory,
167 load_schema: Callable[[TextIO], Any],
168) -> None:
169 with (
170 generate_template(app_code) as app_path,
171 faststream_cli(
172 "faststream",
173 "docs",
174 "gen",
175 f"{app_path.stem}:app",
176 "--out",
177 str(app_path.parent / "schema.json"),
178 *commands,
179 ) as cli_thread,
180 ):
181 cli_thread.wait_for_stderr("Your project AsyncAPI scheme")
183 assert cli_thread.process
185 schema_path = app_path.parent / "schema.json"
186 assert schema_path.exists()
188 with schema_path.open() as f:
189 schema = load_schema(f)
191 assert schema
192 schema_path.unlink()
195@pytest.mark.slow()
196@skip_windows
197def test_gen_wrong_path(faststream_cli) -> None:
198 with faststream_cli("faststream", "docs", "gen", "non_existent:app") as cli:
199 assert cli.wait_for_stderr("No such file or directory")
202@skip_windows
203@skip_macos # MacOS GHArunner doesn't allow to run 0.0.0.0 process
204@require_aiokafka
205@pytest.mark.slow()
206@pytest.mark.flaky(reruns=3, reruns_delay=1)
207def test_serve_asyncapi_docs_from_app(
208 generate_template: interfaces.GenerateTemplateFactory,
209 faststream_cli: interfaces.FastStreamCLIFactory,
210) -> None:
211 with (
212 generate_template(app_code) as app_path,
213 faststream_cli(
214 "faststream", "docs", "serve", "--host", "0.0.0.0", f"{app_path.stem}:app"
215 ) as cli,
216 ):
217 cli.wait_for_stderr("Please, do not use it in production.")
219 try:
220 response = httpx.get("http://0.0.0.0:8000")
221 except Exception as e:
222 raise RuntimeError(cli.stderr) from e
224 assert "<title>FastStream AsyncAPI</title>" in response.text
225 assert response.status_code == 200
228@skip_windows
229@require_aiokafka
230def test_serve_asyncapi_docs_from_app_with_reload(
231 generate_template: interfaces.GenerateTemplateFactory,
232 faststream_cli: interfaces.FastStreamCLIFactory,
233) -> None:
234 with (
235 generate_template(app_code) as app_path,
236 faststream_cli(
237 "faststream",
238 "docs",
239 "serve",
240 f"{app_path.stem}:app",
241 "--reload",
242 ) as cli,
243 ):
244 assert cli.wait_for_stderr("HTTPServer running on http://localhost:8000"), (
245 cli.stderr
246 )
249@skip_windows
250@skip_macos # MacOS GHA runner doesn't allow to run 0.0.0.0 process
251@require_aiokafka
252@pytest.mark.slow()
253@pytest.mark.flaky(reruns=3, reruns_delay=1)
254@pytest.mark.parametrize(
255 ("doc_filename", "doc"),
256 (
257 pytest.param("asyncapi.json", json_asyncapi_doc, id="json_schema"),
258 pytest.param("asyncapi.yaml", yaml_asyncapi_doc, id="yaml_schema"),
259 ),
260)
261def test_serve_asyncapi_docs_from_file(
262 doc_filename: str,
263 doc: str,
264 generate_template: interfaces.GenerateTemplateFactory,
265 faststream_cli: interfaces.FastStreamCLIFactory,
266) -> None:
267 with (
268 generate_template(doc, filename=doc_filename) as doc_path,
269 faststream_cli(
270 "faststream", "docs", "serve", "--host", "0.0.0.0", str(doc_path)
271 ) as cli,
272 ):
273 cli.wait_for_stderr("Please, do not use it in production.")
275 try:
276 response = httpx.get("http://0.0.0.0:8000")
277 except Exception as e:
278 raise RuntimeError(cli.stderr) from e
280 assert "<title>FastStream AsyncAPI</title>" in response.text
281 assert response.status_code == 200