Coverage for faststream / specification / asyncapi / message.py: 96%

50 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Sequence 

2from inspect import isclass 

3from typing import TYPE_CHECKING, Any, Optional, cast, overload 

4 

5from pydantic import BaseModel, create_model 

6 

7from faststream._internal._compat import ( 

8 DEF_KEY, 

9 PYDANTIC_V2, 

10 get_model_fields, 

11 model_schema, 

12) 

13 

14if TYPE_CHECKING: 

15 from fast_depends.core import CallModel 

16 

17 

18def parse_handler_params(call: "CallModel", prefix: str = "") -> dict[str, Any]: 

19 """Parses the handler parameters.""" 

20 model_container = getattr(call, "serializer", call) 

21 model = cast("type[BaseModel] | None", getattr(model_container, "model", None)) 

22 assert model 

23 

24 body = get_model_schema( 

25 create_model( 

26 model.__name__, 

27 **{p.field_name: (p.field_type, p.default_value) for p in call.flat_params}, # type: ignore[call-overload] 

28 ), 

29 prefix=prefix, 

30 exclude=tuple(call.custom_fields.keys()), 

31 ) 

32 

33 if body is None: 

34 return {"title": "EmptyPayload", "type": "null"} 

35 

36 return body 

37 

38 

39@overload 

40def get_response_schema(call: None, prefix: str = "") -> None: ... 

41 

42 

43@overload 

44def get_response_schema(call: "CallModel", prefix: str = "") -> dict[str, Any]: ... 

45 

46 

47def get_response_schema( 

48 call: Optional["CallModel"], 

49 prefix: str = "", 

50) -> dict[str, Any] | None: 

51 """Get the response schema for a given call.""" 

52 return get_model_schema( 

53 getattr( 

54 call, 

55 "response_model", 

56 None, 

57 ), # NOTE: FastAPI Dependant object compatibility 

58 prefix=prefix, 

59 ) 

60 

61 

62@overload 

63def get_model_schema( 

64 call: None, 

65 prefix: str = "", 

66 exclude: Sequence[str] = (), 

67) -> None: ... 

68 

69 

70@overload 

71def get_model_schema( 

72 call: type[BaseModel], 

73 prefix: str = "", 

74 exclude: Sequence[str] = (), 

75) -> dict[str, Any]: ... 

76 

77 

78def get_model_schema( 

79 call: type[BaseModel] | None, 

80 prefix: str = "", 

81 exclude: Sequence[str] = (), 

82) -> dict[str, Any] | None: 

83 """Get the schema of a model.""" 

84 if call is None: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true

85 return None 

86 

87 params = {k: v for k, v in get_model_fields(call).items() if k not in exclude} 

88 params_number = len(params) 

89 

90 if params_number == 0: 

91 return None 

92 

93 model = None 

94 use_original_model = False 

95 if params_number == 1: 

96 name, param = next(iter(params.items())) 

97 if ( 

98 param.annotation 

99 and isclass(param.annotation) 

100 and issubclass(param.annotation, BaseModel) # NOTE: 3.7-3.10 compatibility 

101 ): 

102 model = param.annotation 

103 use_original_model = True 

104 

105 if model is None: 

106 model = call 

107 

108 body: dict[str, Any] = model_schema(model) 

109 body["properties"] = body.get("properties", {}) 

110 for i in exclude: 

111 body["properties"].pop(i, None) 

112 if required := body.get("required"): 

113 body["required"] = list(filter(lambda x: x not in exclude, required)) 

114 

115 if params_number == 1 and not use_original_model: 

116 param_body: dict[str, Any] = body.get("properties", {}) 

117 param_body = param_body[name] 

118 

119 if defs := body.get(DEF_KEY): 

120 # single argument with useless reference 

121 if param_body.get("$ref"): 

122 ref_obj: dict[str, Any] = next(iter(defs.values())) 

123 ref_obj[DEF_KEY] = { 

124 k: v for k, v in defs.items() if k != ref_obj.get("title") 

125 } 

126 return ref_obj 

127 param_body[DEF_KEY] = defs 

128 

129 original_title = param.title if PYDANTIC_V2 else param.field_info.title 

130 

131 if original_title: 

132 use_original_model = True 

133 param_body["title"] = original_title 

134 else: 

135 param_body["title"] = name 

136 

137 body = param_body 

138 

139 if not use_original_model: 

140 body["title"] = f"{prefix}:Payload" 

141 

142 return body