Coverage for faststream / _internal / endpoint / publisher / specification.py: 92%
30 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from inspect import Parameter, unwrap
2from typing import TYPE_CHECKING, Any, Generic
4from fast_depends.core import build_call_model
5from fast_depends.pydantic._compat import create_model, get_config_base
6from typing_extensions import TypeVar as TypeVar313
8from faststream._internal.configs import BrokerConfig, PublisherSpecificationConfig
9from faststream.specification.asyncapi.message import get_model_schema
10from faststream.specification.asyncapi.utils import to_camelcase
12if TYPE_CHECKING:
13 from faststream._internal.basic_types import AnyCallable
14 from faststream.specification.schema import PublisherSpec
17T_SpecificationConfig = TypeVar313(
18 "T_SpecificationConfig",
19 bound=PublisherSpecificationConfig,
20 default=PublisherSpecificationConfig,
21)
22T_BrokerConfig = TypeVar313("T_BrokerConfig", bound=BrokerConfig, default=BrokerConfig)
25class PublisherSpecification(Generic[T_BrokerConfig, T_SpecificationConfig]):
26 def __init__(
27 self,
28 _outer_config: "T_BrokerConfig",
29 specification_config: "T_SpecificationConfig",
30 ) -> None:
31 self.config = specification_config
32 self._outer_config = _outer_config
34 self.calls: list[AnyCallable] = []
36 def add_call(self, call: "AnyCallable") -> None:
37 self.calls.append(call)
39 @property
40 def include_in_schema(self) -> bool:
41 return bool(
42 self._outer_config.include_in_schema and self.config.include_in_schema,
43 )
45 def get_payloads(self) -> list[tuple[dict[str, Any], str]]:
46 payloads: list[tuple[dict[str, Any], str]] = []
48 if self.config.schema_:
49 body = get_model_schema(
50 call=create_model(
51 "",
52 __config__=get_config_base(),
53 response__=(self.config.schema_, ...),
54 ),
55 prefix=f"{self.name}:Message",
56 )
58 if body: # pragma: no branch
59 payloads.append((body, ""))
61 else:
62 di_state = self._outer_config.fd_config
64 for call in self.calls:
65 call_model = build_call_model(
66 call,
67 dependency_provider=di_state.provider,
68 serializer_cls=di_state._serializer,
69 )
71 if call_model.serializer: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true
72 response_type = next(
73 iter(call_model.serializer.response_option.values()),
74 ).field_type
75 else:
76 response_type = None
78 if response_type is not None and response_type is not Parameter.empty:
79 body = get_model_schema(
80 create_model(
81 "",
82 __config__=get_config_base(),
83 response__=(response_type, ...),
84 ),
85 prefix=f"{self.name}:Message",
86 )
88 if body: 88 ↛ 64line 88 didn't jump to line 64 because the condition on line 88 was always true
89 payloads.append((body, to_camelcase(unwrap(call).__name__)))
91 return payloads
93 @property
94 def name(self) -> str:
95 raise NotImplementedError
97 def get_schema(self) -> dict[str, "PublisherSpec"]:
98 raise NotImplementedError