Coverage for faststream / specification / asyncapi / v3_0_0 / generate.py: 96%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import string 

2import warnings 

3from collections.abc import Sequence 

4from typing import TYPE_CHECKING, Any, Optional, Union 

5from urllib.parse import urlparse 

6 

7from faststream._internal._compat import DEF_KEY 

8from faststream._internal.constants import ContentTypes 

9from faststream.specification.asyncapi.utils import clear_key, move_pydantic_refs 

10from faststream.specification.asyncapi.v3_0_0.schema import ( 

11 ApplicationInfo, 

12 ApplicationSchema, 

13 Channel, 

14 Components, 

15 Contact, 

16 ExternalDocs, 

17 License, 

18 Message, 

19 Operation, 

20 Reference, 

21 Server, 

22 Tag, 

23) 

24from faststream.specification.asyncapi.v3_0_0.schema.bindings import ( 

25 OperationBinding, 

26 http as http_bindings, 

27) 

28from faststream.specification.asyncapi.v3_0_0.schema.operations import Action 

29 

30if TYPE_CHECKING: 

31 from faststream._internal.basic_types import AnyHttpUrl 

32 from faststream._internal.broker import BrokerUsecase 

33 from faststream._internal.types import ConnectionType, MsgType 

34 from faststream.asgi.handlers import HttpHandler 

35 from faststream.specification.schema.extra import ( 

36 Contact as SpecContact, 

37 ContactDict, 

38 ExternalDocs as SpecDocs, 

39 ExternalDocsDict, 

40 License as SpecLicense, 

41 LicenseDict, 

42 Tag as SpecTag, 

43 TagDict, 

44 ) 

45 

46 

47def get_app_schema( 

48 broker: "BrokerUsecase[Any, Any]", 

49 /, 

50 title: str, 

51 app_version: str, 

52 schema_version: str, 

53 description: str | None, 

54 terms_of_service: Optional["AnyHttpUrl"], 

55 contact: Union["SpecContact", "ContactDict", dict[str, Any]] | None, 

56 license: Union["SpecLicense", "LicenseDict", dict[str, Any]] | None, 

57 identifier: str | None, 

58 tags: Sequence[Union["SpecTag", "TagDict", dict[str, Any]]] | None, 

59 external_docs: Union["SpecDocs", "ExternalDocsDict", dict[str, Any]] | None, 

60 http_handlers: list[tuple[str, "HttpHandler"]], 

61) -> ApplicationSchema: 

62 """Get the application schema.""" 

63 servers = get_broker_server(broker) 

64 channels, operations = get_broker_channels(broker) 

65 

66 messages: dict[str, Message] = {} 

67 payloads: dict[str, dict[str, Any]] = {} 

68 

69 for channel in channels.values(): 

70 channel.servers = [ 

71 {"$ref": f"#/servers/{server_name}"} for server_name in list(servers.keys()) 

72 ] 

73 added_channels, added_operations = get_asgi_routes(http_handlers) 

74 channels.update(added_channels) 

75 operations.update(added_operations) 

76 

77 for channel_name, channel in channels.items(): 

78 msgs: dict[str, Message | Reference] = {} 

79 for message_name, message in channel.messages.items(): 

80 assert isinstance(message, Message) 

81 

82 msgs[message_name] = _resolve_msg_payloads( 

83 message_name, 

84 message, 

85 channel_name, 

86 payloads, 

87 messages, 

88 ) 

89 

90 channel.messages = msgs 

91 

92 return ApplicationSchema( 

93 info=ApplicationInfo( 

94 title=title, 

95 version=app_version, 

96 description=description, 

97 termsOfService=terms_of_service, 

98 contact=Contact.from_spec(contact), 

99 license=License.from_spec(license), 

100 tags=[Tag.from_spec(tag) for tag in tags] or None if tags else None, 

101 externalDocs=ExternalDocs.from_spec(external_docs), 

102 ), 

103 asyncapi=schema_version, 

104 defaultContentType=ContentTypes.JSON.value, 

105 id=identifier, 

106 servers=servers, 

107 channels=channels, 

108 operations=operations, 

109 components=Components( 

110 messages=messages, 

111 schemas=payloads, 

112 securitySchemes=None 

113 if broker.specification.security is None 

114 else broker.specification.security.get_schema(), 

115 ), 

116 ) 

117 

118 

119def get_broker_server( 

120 broker: "BrokerUsecase[MsgType, ConnectionType]", 

121) -> dict[str, Server]: 

122 """Get the broker server for an application.""" 

123 specification = broker.specification 

124 

125 servers = {} 

126 

127 tags: list[Tag | dict[str, Any]] | None = None 

128 if specification.tags: 

129 tags = [Tag.from_spec(tag) for tag in specification.tags] 

130 

131 broker_meta: dict[str, Any] = { 

132 "protocol": specification.protocol, 

133 "protocolVersion": specification.protocol_version, 

134 "description": specification.description, 

135 "tags": tags, 

136 # TODO 

137 # "variables": "", 

138 # "bindings": "", 

139 } 

140 

141 if specification.security is not None: 

142 broker_meta["security"] = [ 

143 Reference(**{"$ref": f"#/components/securitySchemes/{sec}"}) 

144 for security_item in specification.security.get_requirement() 

145 for sec in security_item 

146 ] 

147 

148 single_server = len(specification.url) == 1 

149 for i, broker_url in enumerate(specification.url, 1): 

150 server_url = broker_url if "://" in broker_url else f"//{broker_url}" 

151 

152 parsed_url = urlparse(server_url) 

153 server_name = "development" if single_server else f"Server{i}" 

154 servers[server_name] = Server( 

155 host=parsed_url.netloc, 

156 pathname=parsed_url.path, 

157 **broker_meta, 

158 ) 

159 

160 return servers 

161 

162 

163def get_broker_channels( 

164 broker: "BrokerUsecase[MsgType, ConnectionType]", 

165) -> tuple[dict[str, Channel], dict[str, Operation]]: 

166 """Get the broker channels for an application.""" 

167 channels = {} 

168 operations = {} 

169 

170 for sub in filter(lambda s: s.specification.include_in_schema, broker.subscribers): 

171 for sub_key, sub_channel in sub.schema().items(): 

172 channel_obj = Channel.from_sub(sub_key, sub_channel) 

173 

174 channel_key = clear_key(sub_key) 

175 if channel_key in channels: 175 ↛ 176line 175 didn't jump to line 176 because the condition on line 175 was never true

176 warnings.warn( 

177 f"Overwrite channel handler, channels have the same names: `{channel_key}`", 

178 RuntimeWarning, 

179 stacklevel=1, 

180 ) 

181 

182 channels[channel_key] = channel_obj 

183 

184 operation_key = ( 

185 f"{channel_key}Subscribe" 

186 if sub.specification.config.title_ is None 

187 or sub.specification.config.title_ == "/" 

188 else sub.specification.config.title_ 

189 ) 

190 if operation_key in operations: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 warnings.warn( 

192 f"Overwrite channel handler, operations have the same names: `{operation_key}`", 

193 RuntimeWarning, 

194 stacklevel=1, 

195 ) 

196 

197 operations[operation_key] = Operation.from_sub( 

198 messages=[ 

199 Reference(**{ 

200 "$ref": f"#/channels/{channel_key}/messages/{msg_name}", 

201 }) 

202 for msg_name in channel_obj.messages 

203 ], 

204 channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), 

205 operation=sub_channel.operation, 

206 ) 

207 

208 for pub in filter(lambda p: p.specification.include_in_schema, broker.publishers): 

209 for pub_key, pub_channel in pub.schema().items(): 

210 channel_obj = Channel.from_pub(pub_key, pub_channel) 

211 

212 channel_key = clear_key(pub_key) 

213 if channel_key in channels: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true

214 warnings.warn( 

215 f"Overwrite channel handler, channels have the same names: `{channel_key}`", 

216 RuntimeWarning, 

217 stacklevel=1, 

218 ) 

219 channels[channel_key] = channel_obj 

220 

221 operations[channel_key] = Operation.from_pub( 

222 messages=[ 

223 Reference(**{ 

224 "$ref": f"#/channels/{channel_key}/messages/{msg_name}", 

225 }) 

226 for msg_name in channel_obj.messages 

227 ], 

228 channel=Reference(**{"$ref": f"#/channels/{channel_key}"}), 

229 operation=pub_channel.operation, 

230 ) 

231 

232 return channels, operations 

233 

234 

235def get_asgi_routes( 

236 http_handlers: list[tuple[str, "HttpHandler"]], 

237) -> tuple[dict[str, Channel], dict[str, Operation]]: 

238 """Get the ASGI routes for an application.""" 

239 channels: dict[str, Channel] = {} 

240 operations: dict[str, Operation] = {} 

241 for path, asgi_app in http_handlers: 

242 if asgi_app.include_in_schema: 

243 channel = Channel( 

244 description=asgi_app.description, 

245 address=path, 

246 messages={}, 

247 ) 

248 channel_name = "".join( 

249 char 

250 for char in path.strip("/").replace("/", "_") 

251 if char in string.ascii_letters + string.digits + "_" 

252 ) 

253 channel_name = f"{channel_name}:HttpChannel" 

254 channels[channel_name] = channel 

255 operation = Operation( 

256 action=Action.RECEIVE, 

257 channel=Reference(**{"$ref": f"#/channels/{channel_name}"}), 

258 bindings=OperationBinding( 

259 http=http_bindings.OperationBinding( 

260 method=_get_http_binding_method(asgi_app.methods), 

261 bindingVersion="0.3.0", 

262 ), 

263 ), 

264 ) 

265 operations[channel_name] = operation 

266 return channels, operations 

267 

268 

269def _get_http_binding_method(methods: Sequence[str]) -> str: 

270 return next((method for method in methods if method != "HEAD"), "HEAD") 

271 

272 

273def _resolve_msg_payloads( 

274 message_name: str, 

275 m: Message, 

276 channel_name: str, 

277 payloads: dict[str, Any], 

278 messages: dict[str, Any], 

279) -> Reference: 

280 assert isinstance(m.payload, dict) 

281 

282 m.payload = move_pydantic_refs(m.payload, DEF_KEY) 

283 

284 message_name = clear_key(message_name) 

285 channel_name = clear_key(channel_name) 

286 

287 if DEF_KEY in m.payload: 

288 payloads.update(m.payload.pop(DEF_KEY)) 

289 

290 one_of = m.payload.get("oneOf", None) 

291 if isinstance(one_of, dict): 

292 one_of_list = [] 

293 processed_payloads: dict[str, dict[str, Any]] = {} 

294 for name, payload in one_of.items(): 

295 # Promote nested Pydantic $defs from each payload into components/schemas 

296 # so that referenced nested models are available globally. 

297 if isinstance(payload, dict) and DEF_KEY in payload: 

298 defs = payload.pop(DEF_KEY) or {} 

299 for def_name, def_schema in defs.items(): 

300 payloads[clear_key(def_name)] = def_schema 

301 processed_payloads[clear_key(name)] = payload 

302 one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{name}"})) 

303 

304 payloads.update(processed_payloads) 

305 m.payload["oneOf"] = one_of_list 

306 assert m.title 

307 messages[clear_key(m.title)] = m 

308 return Reference( 

309 **{"$ref": f"#/components/messages/{channel_name}:{message_name}"}, 

310 ) 

311 

312 payloads.update(m.payload.pop(DEF_KEY, {})) 

313 payload_name = m.payload.get("title", f"{channel_name}:{message_name}:Payload") 

314 payload_name = clear_key(payload_name) 

315 

316 if payload_name in payloads and payloads[payload_name] != m.payload: 

317 warnings.warn( 

318 f"Overwriting the message schema, data types have the same name: `{payload_name}`", 

319 RuntimeWarning, 

320 stacklevel=1, 

321 ) 

322 

323 payloads[payload_name] = m.payload 

324 m.payload = {"$ref": f"#/components/schemas/{payload_name}"} 

325 assert m.title 

326 messages[clear_key(m.title)] = m 

327 return Reference( 

328 **{"$ref": f"#/components/messages/{channel_name}:{message_name}"}, 

329 )