Coverage for faststream / specification / asyncapi / v2_6_0 / generate.py: 95%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import warnings
2from collections.abc import Sequence
3from typing import TYPE_CHECKING, Any, Optional, Union
5from faststream._internal._compat import DEF_KEY
6from faststream._internal.constants import ContentTypes
7from faststream.specification.asyncapi.utils import clear_key, move_pydantic_refs
8from faststream.specification.asyncapi.v2_6_0.schema import (
9 ApplicationInfo,
10 ApplicationSchema,
11 Channel,
12 Components,
13 Contact,
14 ExternalDocs,
15 License,
16 Message,
17 Operation,
18 Reference,
19 Server,
20 Tag,
21)
22from faststream.specification.asyncapi.v2_6_0.schema.bindings import (
23 OperationBinding,
24 http as http_bindings,
25)
27if TYPE_CHECKING:
28 from faststream._internal.basic_types import AnyHttpUrl
29 from faststream._internal.broker import BrokerUsecase
30 from faststream._internal.types import ConnectionType, MsgType
31 from faststream.asgi.handlers import HttpHandler
32 from faststream.specification.schema.extra import (
33 Contact as SpecContact,
34 ContactDict,
35 ExternalDocs as SpecDocs,
36 ExternalDocsDict,
37 License as SpecLicense,
38 LicenseDict,
39 Tag as SpecTag,
40 TagDict,
41 )
44def get_app_schema(
45 broker: "BrokerUsecase[Any, Any]",
46 /,
47 title: str,
48 app_version: str,
49 schema_version: str,
50 description: str | None,
51 terms_of_service: Optional["AnyHttpUrl"],
52 contact: Union["SpecContact", "ContactDict", dict[str, Any]] | None,
53 license: Union["SpecLicense", "LicenseDict", dict[str, Any]] | None,
54 identifier: str | None,
55 tags: Sequence[Union["SpecTag", "TagDict", dict[str, Any]]],
56 external_docs: Union["SpecDocs", "ExternalDocsDict", dict[str, Any]] | None,
57 http_handlers: list[tuple[str, "HttpHandler"]],
58) -> ApplicationSchema:
59 """Get the application schema."""
60 servers = get_broker_server(broker)
61 channels = get_broker_channels(broker)
63 messages: dict[str, Message] = {}
64 payloads: dict[str, dict[str, Any]] = {}
66 for channel in channels.values():
67 channel.servers = list(servers.keys())
68 channels.update(get_asgi_routes(http_handlers))
70 for channel_name, ch in channels.items():
71 resolve_channel_messages(ch, channel_name, payloads, messages)
73 return ApplicationSchema(
74 info=ApplicationInfo(
75 title=title,
76 version=app_version,
77 description=description,
78 termsOfService=terms_of_service,
79 contact=Contact.from_spec(contact),
80 license=License.from_spec(license),
81 ),
82 tags=[Tag.from_spec(tag) for tag in tags] or None,
83 externalDocs=ExternalDocs.from_spec(external_docs),
84 asyncapi=schema_version,
85 defaultContentType=ContentTypes.JSON.value,
86 id=identifier,
87 servers=servers,
88 channels=channels,
89 components=Components(
90 messages=messages,
91 schemas=payloads,
92 securitySchemes=None
93 if broker.specification.security is None
94 else broker.specification.security.get_schema(),
95 ),
96 )
99def resolve_channel_messages(
100 channel: Channel,
101 channel_name: str,
102 payloads: dict[str, dict[str, Any]],
103 messages: dict[str, Message],
104) -> None:
105 if channel.subscribe is not None and channel.subscribe.message:
106 assert isinstance(channel.subscribe.message, Message)
108 channel.subscribe.message = _resolve_msg_payloads(
109 channel.subscribe.message,
110 channel_name,
111 payloads,
112 messages,
113 )
115 if channel.publish is not None:
116 assert isinstance(channel.publish.message, Message)
118 channel.publish.message = _resolve_msg_payloads(
119 channel.publish.message,
120 channel_name,
121 payloads,
122 messages,
123 )
126def get_broker_server(
127 broker: "BrokerUsecase[MsgType, ConnectionType]",
128) -> dict[str, Server]:
129 """Get the broker server for an application."""
130 specification = broker.specification
132 servers = {}
134 broker_meta: dict[str, Any] = {
135 "protocol": specification.protocol,
136 "protocolVersion": specification.protocol_version,
137 "description": specification.description,
138 "tags": [Tag.from_spec(tag) for tag in specification.tags] or None,
139 "security": specification.security.get_requirement()
140 if specification.security
141 else None,
142 # TODO
143 # "variables": "",
144 # "bindings": "",
145 }
147 single_server = len(specification.url) == 1
148 for i, url in enumerate(specification.url, 1):
149 server_name = "development" if single_server else f"Server{i}"
150 servers[server_name] = Server(url=url, **broker_meta)
152 return servers
155def get_broker_channels(
156 broker: "BrokerUsecase[MsgType, ConnectionType]",
157) -> dict[str, Channel]:
158 """Get the broker channels for an application."""
159 channels = {}
161 for s in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
162 for key, sub in s.schema().items():
163 if key in channels: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 warnings.warn(
165 f"Overwrite channel handler, channels have the same names: `{key}`",
166 RuntimeWarning,
167 stacklevel=1,
168 )
170 channels[key] = Channel.from_sub(sub)
172 for p in filter(lambda p: p.specification.include_in_schema, broker.publishers):
173 for key, pub in p.schema().items():
174 if key in channels: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 warnings.warn(
176 f"Overwrite channel handler, channels have the same names: `{key}`",
177 RuntimeWarning,
178 stacklevel=1,
179 )
181 channels[key] = Channel.from_pub(pub)
183 return channels
186def get_asgi_routes(
187 http_handlers: list[tuple[str, "HttpHandler"]],
188) -> dict[str, Channel]:
189 """Get the ASGI routes for an application."""
190 channels: dict[str, Channel] = {}
191 for path, asgi_app in http_handlers:
192 if asgi_app.include_in_schema:
193 channel = Channel(
194 description=asgi_app.description,
195 subscribe=Operation(
196 tags=asgi_app.tags,
197 operationId=asgi_app.unique_id,
198 bindings=OperationBinding(
199 http=http_bindings.OperationBinding(
200 method=", ".join(asgi_app.methods),
201 ),
202 ),
203 message=None,
204 ),
205 )
207 channels[path] = channel
209 return channels
212def _resolve_msg_payloads(
213 m: Message,
214 channel_name: str,
215 payloads: dict[str, Any],
216 messages: dict[str, Any],
217) -> Reference:
218 """Replace message payload by reference and normalize payloads.
220 Payloads and messages are editable dicts to store schemas for reference in AsyncAPI.
221 """
222 one_of_list: list[Reference] = []
223 m.payload = move_pydantic_refs(m.payload, DEF_KEY)
225 if DEF_KEY in m.payload:
226 payloads.update(m.payload.pop(DEF_KEY))
228 one_of = m.payload.get("oneOf")
229 if isinstance(one_of, dict):
230 for p_title, p in one_of.items():
231 formatted_payload_title = clear_key(p_title)
232 # Promote nested Pydantic $defs from each payload into components/schemas
233 # so that referenced nested models are available globally.
234 if isinstance(p, dict) and DEF_KEY in p:
235 defs = p.pop(DEF_KEY) or {}
236 for def_name, def_schema in defs.items():
237 payloads[clear_key(def_name)] = def_schema
238 payloads.update(p.pop(DEF_KEY, {}))
239 if formatted_payload_title not in payloads: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true
240 payloads[formatted_payload_title] = p
241 one_of_list.append(
242 Reference(**{
243 "$ref": f"#/components/schemas/{formatted_payload_title}",
244 }),
245 )
247 elif one_of is not None:
248 # Discriminator case
249 for p in one_of:
250 p_value = next(iter(p.values()))
251 p_title = p_value.split("/")[-1]
252 p_title = clear_key(p_title)
253 if p_title not in payloads: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true
254 payloads[p_title] = p
255 one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{p_title}"}))
257 if not one_of_list:
258 payloads.update(m.payload.pop(DEF_KEY, {}))
259 p_title = m.payload.get("title", f"{channel_name}Payload")
260 p_title = clear_key(p_title)
261 if p_title in payloads and payloads[p_title] != m.payload:
262 warnings.warn(
263 f"Overwriting the message schema, data types have the same name: `{p_title}`",
264 RuntimeWarning,
265 stacklevel=1,
266 )
268 payloads[p_title] = m.payload
269 m.payload = {"$ref": f"#/components/schemas/{p_title}"}
271 else:
272 m.payload["oneOf"] = one_of_list
274 assert m.title
275 message_title = clear_key(m.title)
276 messages[message_title] = m
277 return Reference(**{"$ref": f"#/components/messages/{message_title}"})