Coverage for tests/test_utils.py: 98.36%
53 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-28 17:27 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-28 17:27 +0000
1from __future__ import annotations as _annotations
3import asyncio
4import os
5from collections.abc import AsyncIterator
6from importlib.metadata import distributions
8import pytest
9from inline_snapshot import snapshot
11from pydantic_ai import UserError
12from pydantic_ai._utils import UNSET, PeekableAsyncStream, check_object_json_schema, group_by_temporal
14from .models.mock_async_stream import MockAsyncStream
16pytestmark = pytest.mark.anyio
19@pytest.mark.parametrize(
20 'interval,expected',
21 [
22 (None, snapshot([[1], [2], [3]])),
23 (0, snapshot([[1], [2], [3]])),
24 (0.02, snapshot([[1], [2], [3]])),
25 (0.04, snapshot([[1, 2], [3]])),
26 (0.1, snapshot([[1, 2, 3]])),
27 ],
28)
29async def test_group_by_temporal(interval: float | None, expected: list[list[int]]):
30 async def yield_groups() -> AsyncIterator[int]:
31 yield 1
32 await asyncio.sleep(0.02)
33 yield 2
34 await asyncio.sleep(0.02)
35 yield 3
36 await asyncio.sleep(0.02)
38 async with group_by_temporal(yield_groups(), soft_max_interval=interval) as groups_iter:
39 groups: list[list[int]] = [g async for g in groups_iter]
40 assert groups == expected
43def test_check_object_json_schema():
44 object_schema = {'type': 'object', 'properties': {'a': {'type': 'string'}}}
45 assert check_object_json_schema(object_schema) == object_schema
47 ref_schema = {
48 '$defs': {
49 'JsonModel': {
50 'properties': {
51 'type': {'title': 'Type', 'type': 'string'},
52 'items': {'anyOf': [{'$ref': '#/$defs/JsonModel'}, {'type': 'null'}]},
53 },
54 'required': ['type', 'items'],
55 'title': 'JsonModel',
56 'type': 'object',
57 }
58 },
59 '$ref': '#/$defs/JsonModel',
60 }
61 assert check_object_json_schema(ref_schema) == {
62 'properties': {
63 'type': {'title': 'Type', 'type': 'string'},
64 'items': {'anyOf': [{'$ref': '#/$defs/JsonModel'}, {'type': 'null'}]},
65 },
66 'required': ['type', 'items'],
67 'title': 'JsonModel',
68 'type': 'object',
69 }
71 array_schema = {'type': 'array', 'items': {'type': 'string'}}
72 with pytest.raises(UserError, match='^Schema must be an object$'):
73 check_object_json_schema(array_schema)
76@pytest.mark.parametrize('peek_first', [True, False])
77@pytest.mark.anyio
78async def test_peekable_async_stream(peek_first: bool):
79 async_stream = MockAsyncStream(iter([1, 2, 3]))
80 peekable_async_stream = PeekableAsyncStream(async_stream)
82 items: list[int] = []
84 # We need to both peek before starting the stream, and not, to achieve full coverage
85 if peek_first:
86 assert not await peekable_async_stream.is_exhausted()
87 assert await peekable_async_stream.peek() == 1
89 async for item in peekable_async_stream:
90 items.append(item)
92 # The next line is included mostly for the sake of achieving coverage
93 assert await peekable_async_stream.peek() == (item + 1 if item < 3 else UNSET)
95 assert await peekable_async_stream.is_exhausted()
96 assert await peekable_async_stream.peek() is UNSET
97 assert items == [1, 2, 3]
100def test_package_versions(capsys: pytest.CaptureFixture[str]):
101 if os.getenv('CI'): 101 ↛ exitline 101 didn't return from function 'test_package_versions' because the condition on line 101 was always true
102 with capsys.disabled():
103 print('\npackage versions:')
104 packages = sorted((package.metadata['Name'], package.version) for package in distributions())
105 for name, version in packages:
106 print(f'{name:30} {version}')