Coverage for tests/test_utils.py: 98.31%

51 statements  

« prev     ^ index     » next       coverage.py v7.6.7, created at 2025-01-25 16:43 +0000

1from __future__ import annotations as _annotations 

2 

3import asyncio 

4import os 

5from collections.abc import AsyncIterator 

6from importlib.metadata import distributions 

7 

8import pytest 

9from inline_snapshot import snapshot 

10 

11from pydantic_ai import UserError 

12from pydantic_ai._utils import UNSET, PeekableAsyncStream, check_object_json_schema, group_by_temporal 

13 

14from .models.mock_async_stream import MockAsyncStream 

15 

16pytestmark = pytest.mark.anyio 

17 

18 

19@pytest.mark.parametrize( 

20 'interval,expected', 

21 [ 

22 (None, snapshot([[1], [2], [3]])), 

23 (0, snapshot([[1], [2], [3]])), 

24 (0.02, snapshot([[1], [2], [3]])), 

25 (0.04, snapshot([[1, 2], [3]])), 

26 (0.1, snapshot([[1, 2, 3]])), 

27 ], 

28) 

29async def test_group_by_temporal(interval: float | None, expected: list[list[int]]): 

30 async def yield_groups() -> AsyncIterator[int]: 

31 yield 1 

32 await asyncio.sleep(0.02) 

33 yield 2 

34 await asyncio.sleep(0.02) 

35 yield 3 

36 await asyncio.sleep(0.02) 

37 

38 async with group_by_temporal(yield_groups(), soft_max_interval=interval) as groups_iter: 

39 groups: list[list[int]] = [g async for g in groups_iter] 

40 assert groups == expected 

41 

42 

43def test_check_object_json_schema(): 

44 object_schema = {'type': 'object', 'properties': {'a': {'type': 'string'}}} 

45 assert check_object_json_schema(object_schema) == object_schema 

46 

47 array_schema = {'type': 'array', 'items': {'type': 'string'}} 

48 with pytest.raises(UserError, match='^Schema must be an object$'): 

49 check_object_json_schema(array_schema) 

50 

51 

52@pytest.mark.parametrize('peek_first', [True, False]) 

53@pytest.mark.anyio 

54async def test_peekable_async_stream(peek_first: bool): 

55 async_stream = MockAsyncStream(iter([1, 2, 3])) 

56 peekable_async_stream = PeekableAsyncStream(async_stream) 

57 

58 items: list[int] = [] 

59 

60 # We need to both peek before starting the stream, and not, to achieve full coverage 

61 if peek_first: 

62 assert not await peekable_async_stream.is_exhausted() 

63 assert await peekable_async_stream.peek() == 1 

64 

65 async for item in peekable_async_stream: 

66 items.append(item) 

67 

68 # The next line is included mostly for the sake of achieving coverage 

69 assert await peekable_async_stream.peek() == (item + 1 if item < 3 else UNSET) 

70 

71 assert await peekable_async_stream.is_exhausted() 

72 assert await peekable_async_stream.peek() is UNSET 

73 assert items == [1, 2, 3] 

74 

75 

76def test_package_versions(capsys: pytest.CaptureFixture[str]): 

77 if os.getenv('CI'): 77 ↛ exitline 77 didn't return from function 'test_package_versions' because the condition on line 77 was always true

78 with capsys.disabled(): 

79 print('\npackage versions:') 

80 packages = sorted((package.metadata['Name'], package.version) for package in distributions()) 

81 for name, version in packages: 

82 print(f'{name:30} {version}')