Coverage for faststream / specification / asyncapi / factory.py: 97%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from typing import TYPE_CHECKING, Any, Literal, Optional, Union 

3 

4from faststream.specification.base import Specification, SpecificationFactory 

5 

6if TYPE_CHECKING: 

7 from faststream._internal.basic_types import AnyHttpUrl 

8 from faststream._internal.broker import BrokerUsecase 

9 from faststream.asgi.handlers import HttpHandler 

10 from faststream.specification.schema import Contact, ExternalDocs, License, Tag 

11 

12 

13class AsyncAPI(SpecificationFactory): 

14 def __init__( 

15 self, 

16 broker: Optional["BrokerUsecase[Any, Any]"] = None, 

17 /, 

18 title: str = "FastStream", 

19 version: str = "0.1.0", 

20 description: str | None = None, 

21 terms_of_service: Optional["AnyHttpUrl"] = None, 

22 license: Union["License", "dict[str, Any]"] | None = None, 

23 contact: Union["Contact", "dict[str, Any]"] | None = None, 

24 tags: Sequence[Union["Tag", "dict[str, Any]"]] = (), 

25 external_docs: Union["ExternalDocs", "dict[str, Any]"] | None = None, 

26 identifier: str | None = None, 

27 schema_version: Literal["3.0.0", "2.6.0"] | str = "3.0.0", 

28 ) -> None: 

29 self.title = title 

30 self.version = version 

31 self.description = description 

32 self.terms_of_service = terms_of_service 

33 self.license = license 

34 self.contact = contact 

35 self.tags = tags 

36 self.external_docs = external_docs 

37 self.identifier = identifier 

38 self.schema_version = schema_version 

39 

40 self.brokers: list[BrokerUsecase[Any, Any]] = [] 

41 if broker: 

42 self.add_broker(broker) 

43 

44 self.http_handlers: list[tuple[str, HttpHandler]] = [] 

45 

46 def add_broker( 

47 self, 

48 broker: "BrokerUsecase[Any, Any]", 

49 /, 

50 ) -> "SpecificationFactory": 

51 if broker not in self.brokers: 

52 self.brokers.append(broker) 

53 return self 

54 

55 def add_http_route( 

56 self, 

57 path: str, 

58 handler: "HttpHandler", 

59 ) -> "SpecificationFactory": 

60 self.http_handlers.append((path, handler)) 

61 return self 

62 

63 def to_specification(self) -> Specification: 

64 if self.schema_version.startswith("3."): 

65 from .v3_0_0 import get_app_schema as schema_3_0 

66 

67 return schema_3_0( 

68 self.brokers[0], 

69 title=self.title, 

70 app_version=self.version, 

71 schema_version=self.schema_version, 

72 description=self.description, 

73 terms_of_service=self.terms_of_service, 

74 contact=self.contact, 

75 license=self.license, 

76 identifier=self.identifier, 

77 tags=self.tags, 

78 external_docs=self.external_docs, 

79 http_handlers=self.http_handlers, 

80 ) 

81 

82 if self.schema_version.startswith("2.6."): 

83 from .v2_6_0 import get_app_schema as schema_2_6 

84 

85 return schema_2_6( 

86 self.brokers[0], 

87 title=self.title, 

88 app_version=self.version, 

89 schema_version=self.schema_version, 

90 description=self.description, 

91 terms_of_service=self.terms_of_service, 

92 contact=self.contact, 

93 license=self.license, 

94 identifier=self.identifier, 

95 tags=self.tags, 

96 external_docs=self.external_docs, 

97 http_handlers=self.http_handlers, 

98 ) 

99 

100 msg = f"Unsupported schema version: {self.schema_version}" 

101 raise NotImplementedError(msg)