Coverage for faststream / _internal / broker / broker.py: 98%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from collections.abc import Iterable, Sequence 

3from typing import TYPE_CHECKING, Any, Generic, Optional 

4 

5from fast_depends import Provider 

6from typing_extensions import Self 

7 

8from faststream._internal.configs import BrokerConfigType 

9from faststream._internal.types import ( 

10 BrokerMiddleware, 

11 ConnectionType, 

12 MsgType, 

13) 

14 

15from .pub_base import BrokerPublishMixin 

16from .registrator import Registrator 

17 

18if TYPE_CHECKING: 

19 from types import TracebackType 

20 

21 from faststream._internal.context.repository import ContextRepo 

22 from faststream._internal.di import FastDependsConfig 

23 from faststream._internal.producer import ProducerProto 

24 from faststream.specification.schema import BrokerSpec 

25 

26 

27class BrokerUsecase( 

28 Registrator[MsgType, BrokerConfigType], 

29 BrokerPublishMixin[MsgType], 

30 Generic[MsgType, ConnectionType, BrokerConfigType], 

31): 

32 """Basic class for brokers-only. 

33 

34 Extends `Registrator` by connection, publish and AsyncAPI behavior. 

35 """ 

36 

37 _connection: ConnectionType | None 

38 

39 def __init__( 

40 self, 

41 *, 

42 config: BrokerConfigType, 

43 specification: "BrokerSpec", 

44 routers: Iterable[Registrator[Any, Any]], 

45 **connection_kwargs: Any, 

46 ) -> None: 

47 super().__init__( 

48 routers=routers, 

49 config=config, 

50 ) 

51 self.specification = specification 

52 

53 self.running = False 

54 

55 self._connection_kwargs = connection_kwargs 

56 self._connection = None 

57 

58 @property 

59 def middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]: 

60 return self.config.broker_middlewares 

61 

62 @property 

63 def _producer(self) -> "ProducerProto": 

64 return self.config.producer 

65 

66 @property 

67 def context(self) -> "ContextRepo": 

68 return self.config.fd_config.context 

69 

70 @property 

71 def provider(self) -> Provider: 

72 return self.config.fd_config.provider 

73 

74 async def __aenter__(self) -> "Self": 

75 await self.connect() 

76 return self 

77 

78 async def __aexit__( 

79 self, 

80 exc_type: type[BaseException] | None, 

81 exc_val: BaseException | None, 

82 exc_tb: Optional["TracebackType"], 

83 ) -> None: 

84 await self.stop(exc_type, exc_val, exc_tb) 

85 

86 def _update_fd_config(self, config: "FastDependsConfig") -> None: 

87 """Private method to change broker config state by outer application.""" 

88 self.config.fd_config = config | self.config.fd_config 

89 

90 async def start(self) -> None: 

91 # TODO: filter by already running handlers after TestClient refactor 

92 for sub in self.subscribers: 

93 await sub.start() 

94 

95 for pub in self.publishers: 

96 await pub.start() 

97 

98 self.running = True 

99 

100 def _setup_logger(self) -> None: 

101 for sub in self.subscribers: 

102 log_context = sub.get_log_context(None) 

103 log_context.pop("message_id", None) 

104 self.config.logger.params_storage.register_subscriber(log_context) 

105 

106 self.config.logger._setup(self.config.fd_config.context) 

107 

108 async def connect(self) -> ConnectionType: 

109 """Connect to a remote server.""" 

110 if self._connection is None: 

111 self._connection = await self._connect() 

112 self._setup_logger() 

113 

114 return self._connection 

115 

116 @abstractmethod 

117 async def _connect(self) -> ConnectionType: 

118 raise NotImplementedError 

119 

120 async def stop( 

121 self, 

122 exc_type: type[BaseException] | None = None, 

123 exc_val: BaseException | None = None, 

124 exc_tb: Optional["TracebackType"] = None, 

125 ) -> None: 

126 """Closes the object.""" 

127 for sub in self.subscribers: 

128 await sub.stop() 

129 

130 self.running = False 

131 

132 @abstractmethod 

133 async def ping(self, timeout: float | None) -> bool: 

134 """Check connection alive.""" 

135 raise NotImplementedError