Coverage for tests / conftest.py: 97%

37 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-08 01:48 +0000

1import asyncio 

2from collections.abc import Generator 

3from typing import Any 

4from unittest.mock import AsyncMock, MagicMock 

5from uuid import uuid4 

6 

7import pytest 

8from typer.testing import CliRunner 

9 

10from faststream.__about__ import __version__ 

11from faststream._internal.context import ContextRepo 

12 

13 

14@pytest.hookimpl(tryfirst=True) 

15def pytest_keyboard_interrupt( 

16 excinfo: pytest.ExceptionInfo[KeyboardInterrupt], 

17) -> None: # pragma: no cover 

18 pytest.mark.skip("Interrupted Test Session") 

19 

20 

21def pytest_collection_modifyitems(items: list[pytest.Item]) -> None: 

22 for item in items: 

23 item.add_marker("all") 

24 

25 

26@pytest.fixture() 

27def queue() -> str: 

28 return str(uuid4()) 

29 

30 

31@pytest.fixture() 

32def event() -> asyncio.Event: 

33 return asyncio.Event() 

34 

35 

36@pytest.fixture(scope="session") 

37def runner() -> CliRunner: 

38 return CliRunner() 

39 

40 

41@pytest.fixture() 

42def mock() -> Generator[MagicMock, Any, None]: 

43 """Should be generator to share mock between tests.""" 

44 m = MagicMock() 

45 yield m 

46 m.reset_mock() 

47 

48 

49@pytest.fixture() 

50def async_mock() -> Generator[AsyncMock, Any, None]: 

51 """Should be generator to share mock between tests.""" 

52 m = AsyncMock() 

53 yield m 

54 m.reset_mock() 

55 

56 

57@pytest.fixture(scope="session") 

58def version() -> str: 

59 return __version__ 

60 

61 

62@pytest.fixture() 

63def context() -> ContextRepo: 

64 return ContextRepo() 

65 

66 

67@pytest.fixture() 

68def kafka_basic_project() -> str: 

69 return "docs.docs_src.kafka.basic.basic:app" 

70 

71 

72@pytest.fixture() 

73def kafka_ascynapi_project() -> str: 

74 return "docs.docs_src.kafka.basic.basic:asyncapi" 

75 

76 

77@pytest.fixture(autouse=True) 

78def disable_supervisor(monkeypatch): 

79 monkeypatch.setenv("FASTSTREAM_SUPERVISOR_DISABLED", "1")