Coverage for faststream / _internal / endpoint / subscriber / specification.py: 91%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import TYPE_CHECKING, Any, Generic 

3 

4from typing_extensions import ( 

5 TypeVar as TypeVar313, 

6) 

7 

8from faststream._internal.configs import BrokerConfig, SubscriberSpecificationConfig 

9from faststream.exceptions import SetupError 

10from faststream.specification.asyncapi.message import parse_handler_params 

11from faststream.specification.asyncapi.utils import to_camelcase 

12 

13if TYPE_CHECKING: 

14 from faststream._internal.endpoint.subscriber.call_item import ( 

15 CallsCollection, 

16 ) 

17 from faststream.specification.schema import SubscriberSpec 

18 

19 

20T_SpecificationConfig = TypeVar313( 

21 "T_SpecificationConfig", 

22 bound=SubscriberSpecificationConfig, 

23 default=SubscriberSpecificationConfig, 

24) 

25T_BrokerConfig = TypeVar313("T_BrokerConfig", bound=BrokerConfig, default=BrokerConfig) 

26 

27 

28class SubscriberSpecification(Generic[T_BrokerConfig, T_SpecificationConfig]): 

29 def __init__( 

30 self, 

31 _outer_config: "T_BrokerConfig", 

32 specification_config: "T_SpecificationConfig", 

33 calls: "CallsCollection[Any]", 

34 ) -> None: 

35 self.calls = calls 

36 self.config = specification_config 

37 self._outer_config = _outer_config 

38 

39 @property 

40 def include_in_schema(self) -> bool: 

41 return bool( 

42 self._outer_config.include_in_schema and self.config.include_in_schema, 

43 ) 

44 

45 @property 

46 def description(self) -> str | None: 

47 return self.config.description_ or self.calls.description 

48 

49 @property 

50 def call_name(self) -> str: 

51 return self.calls.name or "Subscriber" 

52 

53 def get_payloads(self) -> list[tuple["dict[str, Any]", str]]: 

54 payloads: list[tuple[dict[str, Any], str]] = [] 

55 

56 call_name = self.call_name 

57 

58 for h in self.calls: 

59 if h.dependant is None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true

60 msg = "You should setup `Handler` at first." 

61 raise SetupError(msg) 

62 

63 body = parse_handler_params( 

64 h.dependant, 

65 prefix=f"{self.config.title_ or call_name}:Message", 

66 ) 

67 payloads.append((body, to_camelcase(h.name))) 

68 

69 if not self.calls: 

70 payloads.append( 

71 ( 

72 { 

73 "title": f"{self.config.title_ or call_name}:Message:Payload", 

74 }, 

75 to_camelcase(call_name), 

76 ), 

77 ) 

78 

79 return payloads 

80 

81 @property 

82 @abstractmethod 

83 def name(self) -> str: 

84 raise NotImplementedError 

85 

86 @abstractmethod 

87 def get_schema(self) -> dict[str, "SubscriberSpec"]: 

88 raise NotImplementedError