Coverage for tests / brokers / base / basic.py: 75%
4 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from abc import abstractmethod
2from typing import Any
4from faststream._internal.broker import BrokerUsecase
5from faststream._internal.broker.router import BrokerRouter
8class BaseTestcaseConfig:
9 timeout: float = 3.0
11 @abstractmethod
12 def get_broker(
13 self,
14 apply_types: bool = False,
15 **kwargs: Any,
16 ) -> BrokerUsecase[Any, Any]:
17 raise NotImplementedError
19 def patch_broker(
20 self,
21 broker: BrokerUsecase,
22 **kwargs: Any,
23 ) -> BrokerUsecase:
24 return broker
26 def get_subscriber_params(
27 self,
28 *args: Any,
29 **kwargs: Any,
30 ) -> tuple[
31 tuple[Any, ...],
32 dict[str, Any],
33 ]:
34 return args, kwargs
36 @abstractmethod
37 def get_router(self, **kwargs: Any) -> BrokerRouter:
38 raise NotImplementedError