Coverage for tests / conftest.py: 97%
37 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2from collections.abc import Generator
3from typing import Any
4from unittest.mock import AsyncMock, MagicMock
5from uuid import uuid4
7import pytest
8from typer.testing import CliRunner
10from faststream.__about__ import __version__
11from faststream._internal.context import ContextRepo
14@pytest.hookimpl(tryfirst=True)
15def pytest_keyboard_interrupt(
16 excinfo: pytest.ExceptionInfo[KeyboardInterrupt],
17) -> None: # pragma: no cover
18 pytest.mark.skip("Interrupted Test Session")
21def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
22 for item in items:
23 item.add_marker("all")
26@pytest.fixture()
27def queue() -> str:
28 return str(uuid4())
31@pytest.fixture()
32def event() -> asyncio.Event:
33 return asyncio.Event()
36@pytest.fixture(scope="session")
37def runner() -> CliRunner:
38 return CliRunner()
41@pytest.fixture()
42def mock() -> Generator[MagicMock, Any, None]:
43 """Should be generator to share mock between tests."""
44 m = MagicMock()
45 yield m
46 m.reset_mock()
49@pytest.fixture()
50def async_mock() -> Generator[AsyncMock, Any, None]:
51 """Should be generator to share mock between tests."""
52 m = AsyncMock()
53 yield m
54 m.reset_mock()
57@pytest.fixture(scope="session")
58def version() -> str:
59 return __version__
62@pytest.fixture()
63def context() -> ContextRepo:
64 return ContextRepo()
67@pytest.fixture()
68def kafka_basic_project() -> str:
69 return "docs.docs_src.kafka.basic.basic:app"
72@pytest.fixture()
73def kafka_ascynapi_project() -> str:
74 return "docs.docs_src.kafka.basic.basic:asyncapi"
77@pytest.fixture(autouse=True)
78def disable_supervisor(monkeypatch):
79 monkeypatch.setenv("FASTSTREAM_SUPERVISOR_DISABLED", "1")