Coverage for tests / brokers / base / connection.py: 95%
20 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1from typing import Any
3import pytest
5from faststream._internal.broker import BrokerUsecase
8class BrokerConnectionTestcase:
9 broker: type[BrokerUsecase]
11 def get_broker_args(self, settings: Any) -> dict[str, Any]:
12 return {}
14 @pytest.mark.asyncio()
15 async def ping(self, broker) -> bool:
16 return await broker.ping(timeout=5.0)
18 @pytest.mark.asyncio()
19 async def test_stop_before_start(self) -> None:
20 br = self.broker()
21 assert br._connection is None
22 await br.stop()
23 assert not br.running
25 @pytest.mark.asyncio()
26 async def test_connect(self, settings: Any) -> None:
27 kwargs = self.get_broker_args(settings)
28 broker = self.broker(**kwargs)
29 await broker.connect()
30 assert await self.ping(broker)
31 await broker.stop()