Coverage for faststream / specification / asyncapi / message.py: 96%
50 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Sequence
2from inspect import isclass
3from typing import TYPE_CHECKING, Any, Optional, cast, overload
5from pydantic import BaseModel, create_model
7from faststream._internal._compat import (
8 DEF_KEY,
9 PYDANTIC_V2,
10 get_model_fields,
11 model_schema,
12)
14if TYPE_CHECKING:
15 from fast_depends.core import CallModel
18def parse_handler_params(call: "CallModel", prefix: str = "") -> dict[str, Any]:
19 """Parses the handler parameters."""
20 model_container = getattr(call, "serializer", call)
21 model = cast("type[BaseModel] | None", getattr(model_container, "model", None))
22 assert model
24 body = get_model_schema(
25 create_model(
26 model.__name__,
27 **{p.field_name: (p.field_type, p.default_value) for p in call.flat_params}, # type: ignore[call-overload]
28 ),
29 prefix=prefix,
30 exclude=tuple(call.custom_fields.keys()),
31 )
33 if body is None:
34 return {"title": "EmptyPayload", "type": "null"}
36 return body
39@overload
40def get_response_schema(call: None, prefix: str = "") -> None: ...
43@overload
44def get_response_schema(call: "CallModel", prefix: str = "") -> dict[str, Any]: ...
47def get_response_schema(
48 call: Optional["CallModel"],
49 prefix: str = "",
50) -> dict[str, Any] | None:
51 """Get the response schema for a given call."""
52 return get_model_schema(
53 getattr(
54 call,
55 "response_model",
56 None,
57 ), # NOTE: FastAPI Dependant object compatibility
58 prefix=prefix,
59 )
62@overload
63def get_model_schema(
64 call: None,
65 prefix: str = "",
66 exclude: Sequence[str] = (),
67) -> None: ...
70@overload
71def get_model_schema(
72 call: type[BaseModel],
73 prefix: str = "",
74 exclude: Sequence[str] = (),
75) -> dict[str, Any]: ...
78def get_model_schema(
79 call: type[BaseModel] | None,
80 prefix: str = "",
81 exclude: Sequence[str] = (),
82) -> dict[str, Any] | None:
83 """Get the schema of a model."""
84 if call is None: 84 ↛ 85line 84 didn't jump to line 85 because the condition on line 84 was never true
85 return None
87 params = {k: v for k, v in get_model_fields(call).items() if k not in exclude}
88 params_number = len(params)
90 if params_number == 0:
91 return None
93 model = None
94 use_original_model = False
95 if params_number == 1:
96 name, param = next(iter(params.items()))
97 if (
98 param.annotation
99 and isclass(param.annotation)
100 and issubclass(param.annotation, BaseModel) # NOTE: 3.7-3.10 compatibility
101 ):
102 model = param.annotation
103 use_original_model = True
105 if model is None:
106 model = call
108 body: dict[str, Any] = model_schema(model)
109 body["properties"] = body.get("properties", {})
110 for i in exclude:
111 body["properties"].pop(i, None)
112 if required := body.get("required"):
113 body["required"] = list(filter(lambda x: x not in exclude, required))
115 if params_number == 1 and not use_original_model:
116 param_body: dict[str, Any] = body.get("properties", {})
117 param_body = param_body[name]
119 if defs := body.get(DEF_KEY):
120 # single argument with useless reference
121 if param_body.get("$ref"):
122 ref_obj: dict[str, Any] = next(iter(defs.values()))
123 ref_obj[DEF_KEY] = {
124 k: v for k, v in defs.items() if k != ref_obj.get("title")
125 }
126 return ref_obj
127 param_body[DEF_KEY] = defs
129 original_title = param.title if PYDANTIC_V2 else param.field_info.title
131 if original_title:
132 use_original_model = True
133 param_body["title"] = original_title
134 else:
135 param_body["title"] = name
137 body = param_body
139 if not use_original_model:
140 body["title"] = f"{prefix}:Payload"
142 return body