Coverage for faststream / _internal / basic_types.py: 64%

33 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from collections.abc import Awaitable, Callable, Mapping, Sequence 

2from contextlib import AbstractAsyncContextManager 

3from datetime import datetime 

4from decimal import Decimal 

5from typing import ( 

6 Any, 

7 ClassVar, 

8 Protocol, 

9 TypeAlias, 

10 TypeVar, 

11) 

12 

13from typing_extensions import ParamSpec 

14 

15AnyHttpUrl: TypeAlias = str 

16 

17F_Return = TypeVar("F_Return") 

18F_Spec = ParamSpec("F_Spec") 

19 

20AnyCallable: TypeAlias = Callable[..., Any] 

21NoneCallable: TypeAlias = Callable[..., None] 

22AsyncFunc: TypeAlias = Callable[..., Awaitable[Any]] 

23AsyncFuncAny: TypeAlias = Callable[[Any], Awaitable[Any]] 

24 

25DecoratedCallable: TypeAlias = AnyCallable 

26DecoratedCallableNone: TypeAlias = NoneCallable 

27 

28Decorator: TypeAlias = Callable[[AnyCallable], AnyCallable] 

29 

30JsonArray: TypeAlias = Sequence["DecodedMessage"] 

31 

32JsonTable: TypeAlias = dict[str, "DecodedMessage"] 

33 

34JsonDecodable: TypeAlias = bool | bytes | bytearray | float | int | str | None 

35 

36DecodedMessage: TypeAlias = JsonDecodable | JsonArray | JsonTable 

37 

38SendableArray: TypeAlias = Sequence["BaseSendableMessage"] 

39 

40SendableTable: TypeAlias = dict[str, "BaseSendableMessage"] 

41 

42 

43class StandardDataclass(Protocol): 

44 """Protocol to check type is dataclass.""" 

45 

46 __dataclass_fields__: ClassVar[dict[str, Any]] 

47 

48 

49BaseSendableMessage: TypeAlias = ( 

50 JsonDecodable 

51 | Decimal 

52 | datetime 

53 | StandardDataclass 

54 | SendableTable 

55 | SendableArray 

56 | None 

57) 

58 

59try: 

60 from pydantic import BaseModel 

61 

62 HAS_PYDANTIC = True 

63except ImportError: 

64 HAS_PYDANTIC = False 

65 

66try: 

67 from msgspec import Struct 

68 

69 HAS_MSGSPEC = True 

70except ImportError: 

71 HAS_MSGSPEC = False 

72 

73if HAS_PYDANTIC and HAS_MSGSPEC: 73 ↛ 75line 73 didn't jump to line 75 because the condition on line 73 was always true

74 SendableMessage: TypeAlias = Struct | BaseModel | BaseSendableMessage 

75elif HAS_PYDANTIC: 

76 SendableMessage: TypeAlias = BaseModel | BaseSendableMessage # type: ignore[no-redef,misc] 

77elif HAS_MSGSPEC: 

78 SendableMessage: TypeAlias = Struct | BaseSendableMessage # type: ignore[no-redef,misc] 

79else: 

80 SendableMessage: TypeAlias = BaseSendableMessage # type: ignore[no-redef,misc] 

81 

82SettingField: TypeAlias = ( 

83 bool | str | list[bool | str] | list[str] | list[bool] | int | None 

84) 

85 

86Lifespan: TypeAlias = Callable[..., AbstractAsyncContextManager[None]] 

87 

88 

89class LoggerProto(Protocol): 

90 def log( 

91 self, 

92 level: int, 

93 msg: Any, 

94 /, 

95 *, 

96 exc_info: Any = None, 

97 extra: Mapping[str, Any] | None = None, 

98 ) -> None: ...