Coverage for faststream / _internal / basic_types.py: 64%
33 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from collections.abc import Awaitable, Callable, Mapping, Sequence
2from contextlib import AbstractAsyncContextManager
3from datetime import datetime
4from decimal import Decimal
5from typing import (
6 Any,
7 ClassVar,
8 Protocol,
9 TypeAlias,
10 TypeVar,
11)
13from typing_extensions import ParamSpec
15AnyHttpUrl: TypeAlias = str
17F_Return = TypeVar("F_Return")
18F_Spec = ParamSpec("F_Spec")
20AnyCallable: TypeAlias = Callable[..., Any]
21NoneCallable: TypeAlias = Callable[..., None]
22AsyncFunc: TypeAlias = Callable[..., Awaitable[Any]]
23AsyncFuncAny: TypeAlias = Callable[[Any], Awaitable[Any]]
25DecoratedCallable: TypeAlias = AnyCallable
26DecoratedCallableNone: TypeAlias = NoneCallable
28Decorator: TypeAlias = Callable[[AnyCallable], AnyCallable]
30JsonArray: TypeAlias = Sequence["DecodedMessage"]
32JsonTable: TypeAlias = dict[str, "DecodedMessage"]
34JsonDecodable: TypeAlias = bool | bytes | bytearray | float | int | str | None
36DecodedMessage: TypeAlias = JsonDecodable | JsonArray | JsonTable
38SendableArray: TypeAlias = Sequence["BaseSendableMessage"]
40SendableTable: TypeAlias = dict[str, "BaseSendableMessage"]
43class StandardDataclass(Protocol):
44 """Protocol to check type is dataclass."""
46 __dataclass_fields__: ClassVar[dict[str, Any]]
49BaseSendableMessage: TypeAlias = (
50 JsonDecodable
51 | Decimal
52 | datetime
53 | StandardDataclass
54 | SendableTable
55 | SendableArray
56 | None
57)
59try:
60 from pydantic import BaseModel
62 HAS_PYDANTIC = True
63except ImportError:
64 HAS_PYDANTIC = False
66try:
67 from msgspec import Struct
69 HAS_MSGSPEC = True
70except ImportError:
71 HAS_MSGSPEC = False
73if HAS_PYDANTIC and HAS_MSGSPEC: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true
74 SendableMessage: TypeAlias = Struct | BaseModel | BaseSendableMessage
75elif HAS_PYDANTIC:
76 SendableMessage: TypeAlias = BaseModel | BaseSendableMessage # type: ignore[no-redef,misc]
77elif HAS_MSGSPEC:
78 SendableMessage: TypeAlias = Struct | BaseSendableMessage # type: ignore[no-redef,misc]
79else:
80 SendableMessage: TypeAlias = BaseSendableMessage # type: ignore[no-redef,misc]
82SettingField: TypeAlias = (
83 bool | str | list[bool | str] | list[str] | list[bool] | int | None
84)
86Lifespan: TypeAlias = Callable[..., AbstractAsyncContextManager[None]]
89class LoggerProto(Protocol):
90 def log(
91 self,
92 level: int,
93 msg: Any,
94 /,
95 *,
96 exc_info: Any = None,
97 extra: Mapping[str, Any] | None = None,
98 ) -> None: ...