Coverage for faststream / specification / asyncapi / v3_0_0 / generate.py: 96%
103 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import string
2import warnings
3from collections.abc import Sequence
4from typing import TYPE_CHECKING, Any, Optional, Union
5from urllib.parse import urlparse
7from faststream._internal._compat import DEF_KEY
8from faststream._internal.constants import ContentTypes
9from faststream.specification.asyncapi.utils import clear_key, move_pydantic_refs
10from faststream.specification.asyncapi.v3_0_0.schema import (
11 ApplicationInfo,
12 ApplicationSchema,
13 Channel,
14 Components,
15 Contact,
16 ExternalDocs,
17 License,
18 Message,
19 Operation,
20 Reference,
21 Server,
22 Tag,
23)
24from faststream.specification.asyncapi.v3_0_0.schema.bindings import (
25 OperationBinding,
26 http as http_bindings,
27)
28from faststream.specification.asyncapi.v3_0_0.schema.operations import Action
30if TYPE_CHECKING:
31 from faststream._internal.basic_types import AnyHttpUrl
32 from faststream._internal.broker import BrokerUsecase
33 from faststream._internal.types import ConnectionType, MsgType
34 from faststream.asgi.handlers import HttpHandler
35 from faststream.specification.schema.extra import (
36 Contact as SpecContact,
37 ContactDict,
38 ExternalDocs as SpecDocs,
39 ExternalDocsDict,
40 License as SpecLicense,
41 LicenseDict,
42 Tag as SpecTag,
43 TagDict,
44 )
47def get_app_schema(
48 broker: "BrokerUsecase[Any, Any]",
49 /,
50 title: str,
51 app_version: str,
52 schema_version: str,
53 description: str | None,
54 terms_of_service: Optional["AnyHttpUrl"],
55 contact: Union["SpecContact", "ContactDict", dict[str, Any]] | None,
56 license: Union["SpecLicense", "LicenseDict", dict[str, Any]] | None,
57 identifier: str | None,
58 tags: Sequence[Union["SpecTag", "TagDict", dict[str, Any]]] | None,
59 external_docs: Union["SpecDocs", "ExternalDocsDict", dict[str, Any]] | None,
60 http_handlers: list[tuple[str, "HttpHandler"]],
61) -> ApplicationSchema:
62 """Get the application schema."""
63 servers = get_broker_server(broker)
64 channels, operations = get_broker_channels(broker)
66 messages: dict[str, Message] = {}
67 payloads: dict[str, dict[str, Any]] = {}
69 for channel in channels.values():
70 channel.servers = [
71 {"$ref": f"#/servers/{server_name}"} for server_name in list(servers.keys())
72 ]
73 added_channels, added_operations = get_asgi_routes(http_handlers)
74 channels.update(added_channels)
75 operations.update(added_operations)
77 for channel_name, channel in channels.items():
78 msgs: dict[str, Message | Reference] = {}
79 for message_name, message in channel.messages.items():
80 assert isinstance(message, Message)
82 msgs[message_name] = _resolve_msg_payloads(
83 message_name,
84 message,
85 channel_name,
86 payloads,
87 messages,
88 )
90 channel.messages = msgs
92 return ApplicationSchema(
93 info=ApplicationInfo(
94 title=title,
95 version=app_version,
96 description=description,
97 termsOfService=terms_of_service,
98 contact=Contact.from_spec(contact),
99 license=License.from_spec(license),
100 tags=[Tag.from_spec(tag) for tag in tags] or None if tags else None,
101 externalDocs=ExternalDocs.from_spec(external_docs),
102 ),
103 asyncapi=schema_version,
104 defaultContentType=ContentTypes.JSON.value,
105 id=identifier,
106 servers=servers,
107 channels=channels,
108 operations=operations,
109 components=Components(
110 messages=messages,
111 schemas=payloads,
112 securitySchemes=None
113 if broker.specification.security is None
114 else broker.specification.security.get_schema(),
115 ),
116 )
119def get_broker_server(
120 broker: "BrokerUsecase[MsgType, ConnectionType]",
121) -> dict[str, Server]:
122 """Get the broker server for an application."""
123 specification = broker.specification
125 servers = {}
127 tags: list[Tag | dict[str, Any]] | None = None
128 if specification.tags:
129 tags = [Tag.from_spec(tag) for tag in specification.tags]
131 broker_meta: dict[str, Any] = {
132 "protocol": specification.protocol,
133 "protocolVersion": specification.protocol_version,
134 "description": specification.description,
135 "tags": tags,
136 # TODO
137 # "variables": "",
138 # "bindings": "",
139 }
141 if specification.security is not None:
142 broker_meta["security"] = [
143 Reference(**{"$ref": f"#/components/securitySchemes/{sec}"})
144 for security_item in specification.security.get_requirement()
145 for sec in security_item
146 ]
148 single_server = len(specification.url) == 1
149 for i, broker_url in enumerate(specification.url, 1):
150 server_url = broker_url if "://" in broker_url else f"//{broker_url}"
152 parsed_url = urlparse(server_url)
153 server_name = "development" if single_server else f"Server{i}"
154 servers[server_name] = Server(
155 host=parsed_url.netloc,
156 pathname=parsed_url.path,
157 **broker_meta,
158 )
160 return servers
163def get_broker_channels(
164 broker: "BrokerUsecase[MsgType, ConnectionType]",
165) -> tuple[dict[str, Channel], dict[str, Operation]]:
166 """Get the broker channels for an application."""
167 channels = {}
168 operations = {}
170 for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers):
171 for sub_key, sub_channel in sub.schema().items():
172 channel_obj = Channel.from_sub(sub_key, sub_channel)
174 channel_key = clear_key(sub_key)
175 if channel_key in channels: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true
176 warnings.warn(
177 f"Overwrite channel handler, channels have the same names: `{channel_key}`",
178 RuntimeWarning,
179 stacklevel=1,
180 )
182 channels[channel_key] = channel_obj
184 operation_key = (
185 f"{channel_key}Subscribe"
186 if sub.specification.config.title_ is None
187 or sub.specification.config.title_ == "/"
188 else sub.specification.config.title_
189 )
190 if operation_key in operations: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 warnings.warn(
192 f"Overwrite channel handler, operations have the same names: `{operation_key}`",
193 RuntimeWarning,
194 stacklevel=1,
195 )
197 operations[operation_key] = Operation.from_sub(
198 messages=[
199 Reference(**{
200 "$ref": f"#/channels/{channel_key}/messages/{msg_name}",
201 })
202 for msg_name in channel_obj.messages
203 ],
204 channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
205 operation=sub_channel.operation,
206 )
208 for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers):
209 for pub_key, pub_channel in pub.schema().items():
210 channel_obj = Channel.from_pub(pub_key, pub_channel)
212 channel_key = clear_key(pub_key)
213 if channel_key in channels: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true
214 warnings.warn(
215 f"Overwrite channel handler, channels have the same names: `{channel_key}`",
216 RuntimeWarning,
217 stacklevel=1,
218 )
219 channels[channel_key] = channel_obj
221 operations[channel_key] = Operation.from_pub(
222 messages=[
223 Reference(**{
224 "$ref": f"#/channels/{channel_key}/messages/{msg_name}",
225 })
226 for msg_name in channel_obj.messages
227 ],
228 channel=Reference(**{"$ref": f"#/channels/{channel_key}"}),
229 operation=pub_channel.operation,
230 )
232 return channels, operations
235def get_asgi_routes(
236 http_handlers: list[tuple[str, "HttpHandler"]],
237) -> tuple[dict[str, Channel], dict[str, Operation]]:
238 """Get the ASGI routes for an application."""
239 channels: dict[str, Channel] = {}
240 operations: dict[str, Operation] = {}
241 for path, asgi_app in http_handlers:
242 if asgi_app.include_in_schema:
243 channel = Channel(
244 description=asgi_app.description,
245 address=path,
246 messages={},
247 )
248 channel_name = "".join(
249 char
250 for char in path.strip("/").replace("/", "_")
251 if char in string.ascii_letters + string.digits + "_"
252 )
253 channel_name = f"{channel_name}:HttpChannel"
254 channels[channel_name] = channel
255 operation = Operation(
256 action=Action.RECEIVE,
257 channel=Reference(**{"$ref": f"#/channels/{channel_name}"}),
258 bindings=OperationBinding(
259 http=http_bindings.OperationBinding(
260 method=_get_http_binding_method(asgi_app.methods),
261 bindingVersion="0.3.0",
262 ),
263 ),
264 )
265 operations[channel_name] = operation
266 return channels, operations
269def _get_http_binding_method(methods: Sequence[str]) -> str:
270 return next((method for method in methods if method != "HEAD"), "HEAD")
273def _resolve_msg_payloads(
274 message_name: str,
275 m: Message,
276 channel_name: str,
277 payloads: dict[str, Any],
278 messages: dict[str, Any],
279) -> Reference:
280 assert isinstance(m.payload, dict)
282 m.payload = move_pydantic_refs(m.payload, DEF_KEY)
284 message_name = clear_key(message_name)
285 channel_name = clear_key(channel_name)
287 if DEF_KEY in m.payload:
288 payloads.update(m.payload.pop(DEF_KEY))
290 one_of = m.payload.get("oneOf", None)
291 if isinstance(one_of, dict):
292 one_of_list = []
293 processed_payloads: dict[str, dict[str, Any]] = {}
294 for name, payload in one_of.items():
295 # Promote nested Pydantic $defs from each payload into components/schemas
296 # so that referenced nested models are available globally.
297 if isinstance(payload, dict) and DEF_KEY in payload:
298 defs = payload.pop(DEF_KEY) or {}
299 for def_name, def_schema in defs.items():
300 payloads[clear_key(def_name)] = def_schema
301 processed_payloads[clear_key(name)] = payload
302 one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{name}"}))
304 payloads.update(processed_payloads)
305 m.payload["oneOf"] = one_of_list
306 assert m.title
307 messages[clear_key(m.title)] = m
308 return Reference(
309 **{"$ref": f"#/components/messages/{channel_name}:{message_name}"},
310 )
312 payloads.update(m.payload.pop(DEF_KEY, {}))
313 payload_name = m.payload.get("title", f"{channel_name}:{message_name}:Payload")
314 payload_name = clear_key(payload_name)
316 if payload_name in payloads and payloads[payload_name] != m.payload:
317 warnings.warn(
318 f"Overwriting the message schema, data types have the same name: `{payload_name}`",
319 RuntimeWarning,
320 stacklevel=1,
321 )
323 payloads[payload_name] = m.payload
324 m.payload = {"$ref": f"#/components/schemas/{payload_name}"}
325 assert m.title
326 messages[clear_key(m.title)] = m
327 return Reference(
328 **{"$ref": f"#/components/messages/{channel_name}:{message_name}"},
329 )