Coverage for tests / brokers / base / connection.py: 95%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1from typing import Any 

2 

3import pytest 

4 

5from faststream._internal.broker import BrokerUsecase 

6 

7 

8class BrokerConnectionTestcase: 

9 broker: type[BrokerUsecase] 

10 

11 def get_broker_args(self, settings: Any) -> dict[str, Any]: 

12 return {} 

13 

14 @pytest.mark.asyncio() 

15 async def ping(self, broker) -> bool: 

16 return await broker.ping(timeout=5.0) 

17 

18 @pytest.mark.asyncio() 

19 async def test_stop_before_start(self) -> None: 

20 br = self.broker() 

21 assert br._connection is None 

22 await br.stop() 

23 assert not br.running 

24 

25 @pytest.mark.asyncio() 

26 async def test_connect(self, settings: Any) -> None: 

27 kwargs = self.get_broker_args(settings) 

28 broker = self.broker(**kwargs) 

29 await broker.connect() 

30 assert await self.ping(broker) 

31 await broker.stop()