Coverage for tests / brokers / base / basic.py: 75%

4 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from abc import abstractmethod 

2from typing import Any 

3 

4from faststream._internal.broker import BrokerUsecase 

5from faststream._internal.broker.router import BrokerRouter 

6 

7 

8class BaseTestcaseConfig: 

9 timeout: float = 3.0 

10 

11 @abstractmethod 

12 def get_broker( 

13 self, 

14 apply_types: bool = False, 

15 **kwargs: Any, 

16 ) -> BrokerUsecase[Any, Any]: 

17 raise NotImplementedError 

18 

19 def patch_broker( 

20 self, 

21 broker: BrokerUsecase, 

22 **kwargs: Any, 

23 ) -> BrokerUsecase: 

24 return broker 

25 

26 def get_subscriber_params( 

27 self, 

28 *args: Any, 

29 **kwargs: Any, 

30 ) -> tuple[ 

31 tuple[Any, ...], 

32 dict[str, Any], 

33 ]: 

34 return args, kwargs 

35 

36 @abstractmethod 

37 def get_router(self, **kwargs: Any) -> BrokerRouter: 

38 raise NotImplementedError