Coverage for faststream / specification / asyncapi / v2_6_0 / generate.py: 95%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import warnings 

2from collections.abc import Sequence 

3from typing import TYPE_CHECKING, Any, Optional, Union 

4 

5from faststream._internal._compat import DEF_KEY 

6from faststream._internal.constants import ContentTypes 

7from faststream.specification.asyncapi.utils import clear_key, move_pydantic_refs 

8from faststream.specification.asyncapi.v2_6_0.schema import ( 

9 ApplicationInfo, 

10 ApplicationSchema, 

11 Channel, 

12 Components, 

13 Contact, 

14 ExternalDocs, 

15 License, 

16 Message, 

17 Operation, 

18 Reference, 

19 Server, 

20 Tag, 

21) 

22from faststream.specification.asyncapi.v2_6_0.schema.bindings import ( 

23 OperationBinding, 

24 http as http_bindings, 

25) 

26 

27if TYPE_CHECKING: 

28 from faststream._internal.basic_types import AnyHttpUrl 

29 from faststream._internal.broker import BrokerUsecase 

30 from faststream._internal.types import ConnectionType, MsgType 

31 from faststream.asgi.handlers import HttpHandler 

32 from faststream.specification.schema.extra import ( 

33 Contact as SpecContact, 

34 ContactDict, 

35 ExternalDocs as SpecDocs, 

36 ExternalDocsDict, 

37 License as SpecLicense, 

38 LicenseDict, 

39 Tag as SpecTag, 

40 TagDict, 

41 ) 

42 

43 

44def get_app_schema( 

45 broker: "BrokerUsecase[Any, Any]", 

46 /, 

47 title: str, 

48 app_version: str, 

49 schema_version: str, 

50 description: str | None, 

51 terms_of_service: Optional["AnyHttpUrl"], 

52 contact: Union["SpecContact", "ContactDict", dict[str, Any]] | None, 

53 license: Union["SpecLicense", "LicenseDict", dict[str, Any]] | None, 

54 identifier: str | None, 

55 tags: Sequence[Union["SpecTag", "TagDict", dict[str, Any]]], 

56 external_docs: Union["SpecDocs", "ExternalDocsDict", dict[str, Any]] | None, 

57 http_handlers: list[tuple[str, "HttpHandler"]], 

58) -> ApplicationSchema: 

59 """Get the application schema.""" 

60 servers = get_broker_server(broker) 

61 channels = get_broker_channels(broker) 

62 

63 messages: dict[str, Message] = {} 

64 payloads: dict[str, dict[str, Any]] = {} 

65 

66 for channel in channels.values(): 

67 channel.servers = list(servers.keys()) 

68 channels.update(get_asgi_routes(http_handlers)) 

69 

70 for channel_name, ch in channels.items(): 

71 resolve_channel_messages(ch, channel_name, payloads, messages) 

72 

73 return ApplicationSchema( 

74 info=ApplicationInfo( 

75 title=title, 

76 version=app_version, 

77 description=description, 

78 termsOfService=terms_of_service, 

79 contact=Contact.from_spec(contact), 

80 license=License.from_spec(license), 

81 ), 

82 tags=[Tag.from_spec(tag) for tag in tags] or None, 

83 externalDocs=ExternalDocs.from_spec(external_docs), 

84 asyncapi=schema_version, 

85 defaultContentType=ContentTypes.JSON.value, 

86 id=identifier, 

87 servers=servers, 

88 channels=channels, 

89 components=Components( 

90 messages=messages, 

91 schemas=payloads, 

92 securitySchemes=None 

93 if broker.specification.security is None 

94 else broker.specification.security.get_schema(), 

95 ), 

96 ) 

97 

98 

99def resolve_channel_messages( 

100 channel: Channel, 

101 channel_name: str, 

102 payloads: dict[str, dict[str, Any]], 

103 messages: dict[str, Message], 

104) -> None: 

105 if channel.subscribe is not None and channel.subscribe.message: 

106 assert isinstance(channel.subscribe.message, Message) 

107 

108 channel.subscribe.message = _resolve_msg_payloads( 

109 channel.subscribe.message, 

110 channel_name, 

111 payloads, 

112 messages, 

113 ) 

114 

115 if channel.publish is not None: 

116 assert isinstance(channel.publish.message, Message) 

117 

118 channel.publish.message = _resolve_msg_payloads( 

119 channel.publish.message, 

120 channel_name, 

121 payloads, 

122 messages, 

123 ) 

124 

125 

126def get_broker_server( 

127 broker: "BrokerUsecase[MsgType, ConnectionType]", 

128) -> dict[str, Server]: 

129 """Get the broker server for an application.""" 

130 specification = broker.specification 

131 

132 servers = {} 

133 

134 broker_meta: dict[str, Any] = { 

135 "protocol": specification.protocol, 

136 "protocolVersion": specification.protocol_version, 

137 "description": specification.description, 

138 "tags": [Tag.from_spec(tag) for tag in specification.tags] or None, 

139 "security": specification.security.get_requirement() 

140 if specification.security 

141 else None, 

142 # TODO 

143 # "variables": "", 

144 # "bindings": "", 

145 } 

146 

147 single_server = len(specification.url) == 1 

148 for i, url in enumerate(specification.url, 1): 

149 server_name = "development" if single_server else f"Server{i}" 

150 servers[server_name] = Server(url=url, **broker_meta) 

151 

152 return servers 

153 

154 

155def get_broker_channels( 

156 broker: "BrokerUsecase[MsgType, ConnectionType]", 

157) -> dict[str, Channel]: 

158 """Get the broker channels for an application.""" 

159 channels = {} 

160 

161 for s in filter(lambda s: s.specification.include_in_schema, broker.subscribers): 

162 for key, sub in s.schema().items(): 

163 if key in channels: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 warnings.warn( 

165 f"Overwrite channel handler, channels have the same names: `{key}`", 

166 RuntimeWarning, 

167 stacklevel=1, 

168 ) 

169 

170 channels[key] = Channel.from_sub(sub) 

171 

172 for p in filter(lambda p: p.specification.include_in_schema, broker.publishers): 

173 for key, pub in p.schema().items(): 

174 if key in channels: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 warnings.warn( 

176 f"Overwrite channel handler, channels have the same names: `{key}`", 

177 RuntimeWarning, 

178 stacklevel=1, 

179 ) 

180 

181 channels[key] = Channel.from_pub(pub) 

182 

183 return channels 

184 

185 

186def get_asgi_routes( 

187 http_handlers: list[tuple[str, "HttpHandler"]], 

188) -> dict[str, Channel]: 

189 """Get the ASGI routes for an application.""" 

190 channels: dict[str, Channel] = {} 

191 for path, asgi_app in http_handlers: 

192 if asgi_app.include_in_schema: 

193 channel = Channel( 

194 description=asgi_app.description, 

195 subscribe=Operation( 

196 tags=asgi_app.tags, 

197 operationId=asgi_app.unique_id, 

198 bindings=OperationBinding( 

199 http=http_bindings.OperationBinding( 

200 method=", ".join(asgi_app.methods), 

201 ), 

202 ), 

203 message=None, 

204 ), 

205 ) 

206 

207 channels[path] = channel 

208 

209 return channels 

210 

211 

212def _resolve_msg_payloads( 

213 m: Message, 

214 channel_name: str, 

215 payloads: dict[str, Any], 

216 messages: dict[str, Any], 

217) -> Reference: 

218 """Replace message payload by reference and normalize payloads. 

219 

220 Payloads and messages are editable dicts to store schemas for reference in AsyncAPI. 

221 """ 

222 one_of_list: list[Reference] = [] 

223 m.payload = move_pydantic_refs(m.payload, DEF_KEY) 

224 

225 if DEF_KEY in m.payload: 

226 payloads.update(m.payload.pop(DEF_KEY)) 

227 

228 one_of = m.payload.get("oneOf") 

229 if isinstance(one_of, dict): 

230 for p_title, p in one_of.items(): 

231 formatted_payload_title = clear_key(p_title) 

232 # Promote nested Pydantic $defs from each payload into components/schemas 

233 # so that referenced nested models are available globally. 

234 if isinstance(p, dict) and DEF_KEY in p: 

235 defs = p.pop(DEF_KEY) or {} 

236 for def_name, def_schema in defs.items(): 

237 payloads[clear_key(def_name)] = def_schema 

238 payloads.update(p.pop(DEF_KEY, {})) 

239 if formatted_payload_title not in payloads: 239 ↛ 241line 239 didn't jump to line 241 because the condition on line 239 was always true

240 payloads[formatted_payload_title] = p 

241 one_of_list.append( 

242 Reference(**{ 

243 "$ref": f"#/components/schemas/{formatted_payload_title}", 

244 }), 

245 ) 

246 

247 elif one_of is not None: 

248 # Discriminator case 

249 for p in one_of: 

250 p_value = next(iter(p.values())) 

251 p_title = p_value.split("/")[-1] 

252 p_title = clear_key(p_title) 

253 if p_title not in payloads: 253 ↛ 254line 253 didn't jump to line 254 because the condition on line 253 was never true

254 payloads[p_title] = p 

255 one_of_list.append(Reference(**{"$ref": f"#/components/schemas/{p_title}"})) 

256 

257 if not one_of_list: 

258 payloads.update(m.payload.pop(DEF_KEY, {})) 

259 p_title = m.payload.get("title", f"{channel_name}Payload") 

260 p_title = clear_key(p_title) 

261 if p_title in payloads and payloads[p_title] != m.payload: 

262 warnings.warn( 

263 f"Overwriting the message schema, data types have the same name: `{p_title}`", 

264 RuntimeWarning, 

265 stacklevel=1, 

266 ) 

267 

268 payloads[p_title] = m.payload 

269 m.payload = {"$ref": f"#/components/schemas/{p_title}"} 

270 

271 else: 

272 m.payload["oneOf"] = one_of_list 

273 

274 assert m.title 

275 message_title = clear_key(m.title) 

276 messages[message_title] = m 

277 return Reference(**{"$ref": f"#/components/messages/{message_title}"})