Coverage for tests/test_utils.py: 98.31%
51 statements
« prev ^ index » next coverage.py v7.6.7, created at 2025-01-25 16:43 +0000
« prev ^ index » next coverage.py v7.6.7, created at 2025-01-25 16:43 +0000
1from __future__ import annotations as _annotations
3import asyncio
4import os
5from collections.abc import AsyncIterator
6from importlib.metadata import distributions
8import pytest
9from inline_snapshot import snapshot
11from pydantic_ai import UserError
12from pydantic_ai._utils import UNSET, PeekableAsyncStream, check_object_json_schema, group_by_temporal
14from .models.mock_async_stream import MockAsyncStream
16pytestmark = pytest.mark.anyio
19@pytest.mark.parametrize(
20 'interval,expected',
21 [
22 (None, snapshot([[1], [2], [3]])),
23 (0, snapshot([[1], [2], [3]])),
24 (0.02, snapshot([[1], [2], [3]])),
25 (0.04, snapshot([[1, 2], [3]])),
26 (0.1, snapshot([[1, 2, 3]])),
27 ],
28)
29async def test_group_by_temporal(interval: float | None, expected: list[list[int]]):
30 async def yield_groups() -> AsyncIterator[int]:
31 yield 1
32 await asyncio.sleep(0.02)
33 yield 2
34 await asyncio.sleep(0.02)
35 yield 3
36 await asyncio.sleep(0.02)
38 async with group_by_temporal(yield_groups(), soft_max_interval=interval) as groups_iter:
39 groups: list[list[int]] = [g async for g in groups_iter]
40 assert groups == expected
43def test_check_object_json_schema():
44 object_schema = {'type': 'object', 'properties': {'a': {'type': 'string'}}}
45 assert check_object_json_schema(object_schema) == object_schema
47 array_schema = {'type': 'array', 'items': {'type': 'string'}}
48 with pytest.raises(UserError, match='^Schema must be an object$'):
49 check_object_json_schema(array_schema)
52@pytest.mark.parametrize('peek_first', [True, False])
53@pytest.mark.anyio
54async def test_peekable_async_stream(peek_first: bool):
55 async_stream = MockAsyncStream(iter([1, 2, 3]))
56 peekable_async_stream = PeekableAsyncStream(async_stream)
58 items: list[int] = []
60 # We need to both peek before starting the stream, and not, to achieve full coverage
61 if peek_first:
62 assert not await peekable_async_stream.is_exhausted()
63 assert await peekable_async_stream.peek() == 1
65 async for item in peekable_async_stream:
66 items.append(item)
68 # The next line is included mostly for the sake of achieving coverage
69 assert await peekable_async_stream.peek() == (item + 1 if item < 3 else UNSET)
71 assert await peekable_async_stream.is_exhausted()
72 assert await peekable_async_stream.peek() is UNSET
73 assert items == [1, 2, 3]
76def test_package_versions(capsys: pytest.CaptureFixture[str]):
77 if os.getenv('CI'): 77 ↛ exitline 77 didn't return from function 'test_package_versions' because the condition on line 77 was always true
78 with capsys.disabled():
79 print('\npackage versions:')
80 packages = sorted((package.metadata['Name'], package.version) for package in distributions())
81 for name, version in packages:
82 print(f'{name:30} {version}')