Coverage for faststream / _internal / endpoint / publisher / specification.py: 92%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from inspect import Parameter, unwrap 

2from typing import TYPE_CHECKING, Any, Generic 

3 

4from fast_depends.core import build_call_model 

5from fast_depends.pydantic._compat import create_model, get_config_base 

6from typing_extensions import TypeVar as TypeVar313 

7 

8from faststream._internal.configs import BrokerConfig, PublisherSpecificationConfig 

9from faststream.specification.asyncapi.message import get_model_schema 

10from faststream.specification.asyncapi.utils import to_camelcase 

11 

12if TYPE_CHECKING: 

13 from faststream._internal.basic_types import AnyCallable 

14 from faststream.specification.schema import PublisherSpec 

15 

16 

17T_SpecificationConfig = TypeVar313( 

18 "T_SpecificationConfig", 

19 bound=PublisherSpecificationConfig, 

20 default=PublisherSpecificationConfig, 

21) 

22T_BrokerConfig = TypeVar313("T_BrokerConfig", bound=BrokerConfig, default=BrokerConfig) 

23 

24 

25class PublisherSpecification(Generic[T_BrokerConfig, T_SpecificationConfig]): 

26 def __init__( 

27 self, 

28 _outer_config: "T_BrokerConfig", 

29 specification_config: "T_SpecificationConfig", 

30 ) -> None: 

31 self.config = specification_config 

32 self._outer_config = _outer_config 

33 

34 self.calls: list[AnyCallable] = [] 

35 

36 def add_call(self, call: "AnyCallable") -> None: 

37 self.calls.append(call) 

38 

39 @property 

40 def include_in_schema(self) -> bool: 

41 return bool( 

42 self._outer_config.include_in_schema and self.config.include_in_schema, 

43 ) 

44 

45 def get_payloads(self) -> list[tuple[dict[str, Any], str]]: 

46 payloads: list[tuple[dict[str, Any], str]] = [] 

47 

48 if self.config.schema_: 

49 body = get_model_schema( 

50 call=create_model( 

51 "", 

52 __config__=get_config_base(), 

53 response__=(self.config.schema_, ...), 

54 ), 

55 prefix=f"{self.name}:Message", 

56 ) 

57 

58 if body: # pragma: no branch 

59 payloads.append((body, "")) 

60 

61 else: 

62 di_state = self._outer_config.fd_config 

63 

64 for call in self.calls: 

65 call_model = build_call_model( 

66 call, 

67 dependency_provider=di_state.provider, 

68 serializer_cls=di_state._serializer, 

69 ) 

70 

71 if call_model.serializer: 71 ↛ 76line 71 didn't jump to line 76 because the condition on line 71 was always true

72 response_type = next( 

73 iter(call_model.serializer.response_option.values()), 

74 ).field_type 

75 else: 

76 response_type = None 

77 

78 if response_type is not None and response_type is not Parameter.empty: 

79 body = get_model_schema( 

80 create_model( 

81 "", 

82 __config__=get_config_base(), 

83 response__=(response_type, ...), 

84 ), 

85 prefix=f"{self.name}:Message", 

86 ) 

87 

88 if body: 88 ↛ 64line 88 didn't jump to line 64 because the condition on line 88 was always true

89 payloads.append((body, to_camelcase(unwrap(call).__name__))) 

90 

91 return payloads 

92 

93 @property 

94 def name(self) -> str: 

95 raise NotImplementedError 

96 

97 def get_schema(self) -> dict[str, "PublisherSpec"]: 

98 raise NotImplementedError