Coverage for tests / brokers / supervisor / test_supervisor.py: 95%
43 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-08 01:48 +0000
1import asyncio
2import logging
3from contextlib import suppress
4from unittest.mock import patch
6import pytest
8from faststream._internal.endpoint.subscriber.supervisor import (
9 TaskCallbackSupervisor,
10 _SupervisorCache,
11)
14@pytest.mark.asyncio()
15async def test_task_failing(subscriber_with_task_mixin):
16 async def failing_task():
17 raise ValueError
19 logging.disable(logging.CRITICAL + 1)
21 task = subscriber_with_task_mixin.add_task(failing_task)
22 with suppress(ValueError):
23 await task
25 assert len(subscriber_with_task_mixin.tasks) > 1
26 assert len(TaskCallbackSupervisor._TaskCallbackSupervisor__cache) == 1
29@pytest.mark.asyncio()
30async def test_task_successful(subscriber_with_task_mixin):
31 async def successful_task():
32 return True
34 task = subscriber_with_task_mixin.add_task(successful_task)
35 await task
36 assert len(subscriber_with_task_mixin.tasks) == 1
37 assert task.result()
40@pytest.mark.asyncio()
41@pytest.mark.slow()
42async def test_ignore_cancellation_error(subscriber_with_task_mixin):
43 async def cancelled_task():
44 await asyncio.sleep(10)
45 return True
47 task = subscriber_with_task_mixin.add_task(cancelled_task)
48 task.cancel()
49 with pytest.raises(asyncio.CancelledError):
50 await task
52 await asyncio.sleep(3)
53 assert len(subscriber_with_task_mixin.tasks) == 1
56def test_supervisor_cache(monkeypatch):
57 with patch("time.time") as mocked_time:
58 mocked_time.return_value = 0
59 cache = _SupervisorCache()
60 cache.add(1)
61 cache.add(2)
62 mocked_time.return_value = cache.ttl - 1
63 assert 1 in cache
64 assert 2 in cache
65 assert 1 in cache
66 mocked_time.return_value = cache.ttl + 1
67 assert 1 not in cache
68 assert 2 not in cache