Coverage for faststream / _internal / endpoint / subscriber / specification.py: 91%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from typing import TYPE_CHECKING, Any, Generic
4from typing_extensions import (
5 TypeVar as TypeVar313,
6)
8from faststream._internal.configs import BrokerConfig, SubscriberSpecificationConfig
9from faststream.exceptions import SetupError
10from faststream.specification.asyncapi.message import parse_handler_params
11from faststream.specification.asyncapi.utils import to_camelcase
13if TYPE_CHECKING:
14 from faststream._internal.endpoint.subscriber.call_item import (
15 CallsCollection,
16 )
17 from faststream.specification.schema import SubscriberSpec
20T_SpecificationConfig = TypeVar313(
21 "T_SpecificationConfig",
22 bound=SubscriberSpecificationConfig,
23 default=SubscriberSpecificationConfig,
24)
25T_BrokerConfig = TypeVar313("T_BrokerConfig", bound=BrokerConfig, default=BrokerConfig)
28class SubscriberSpecification(Generic[T_BrokerConfig, T_SpecificationConfig]):
29 def __init__(
30 self,
31 _outer_config: "T_BrokerConfig",
32 specification_config: "T_SpecificationConfig",
33 calls: "CallsCollection[Any]",
34 ) -> None:
35 self.calls = calls
36 self.config = specification_config
37 self._outer_config = _outer_config
39 @property
40 def include_in_schema(self) -> bool:
41 return bool(
42 self._outer_config.include_in_schema and self.config.include_in_schema,
43 )
45 @property
46 def description(self) -> str | None:
47 return self.config.description_ or self.calls.description
49 @property
50 def call_name(self) -> str:
51 return self.calls.name or "Subscriber"
53 def get_payloads(self) -> list[tuple["dict[str, Any]", str]]:
54 payloads: list[tuple[dict[str, Any], str]] = []
56 call_name = self.call_name
58 for h in self.calls:
59 if h.dependant is None: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 msg = "You should setup `Handler` at first."
61 raise SetupError(msg)
63 body = parse_handler_params(
64 h.dependant,
65 prefix=f"{self.config.title_ or call_name}:Message",
66 )
67 payloads.append((body, to_camelcase(h.name)))
69 if not self.calls:
70 payloads.append(
71 (
72 {
73 "title": f"{self.config.title_ or call_name}:Message:Payload",
74 },
75 to_camelcase(call_name),
76 ),
77 )
79 return payloads
81 @property
82 @abstractmethod
83 def name(self) -> str:
84 raise NotImplementedError
86 @abstractmethod
87 def get_schema(self) -> dict[str, "SubscriberSpec"]:
88 raise NotImplementedError