Coverage for tests/test_utils.py: 98.36%

53 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-28 17:27 +0000

1from __future__ import annotations as _annotations 

2 

3import asyncio 

4import os 

5from collections.abc import AsyncIterator 

6from importlib.metadata import distributions 

7 

8import pytest 

9from inline_snapshot import snapshot 

10 

11from pydantic_ai import UserError 

12from pydantic_ai._utils import UNSET, PeekableAsyncStream, check_object_json_schema, group_by_temporal 

13 

14from .models.mock_async_stream import MockAsyncStream 

15 

16pytestmark = pytest.mark.anyio 

17 

18 

19@pytest.mark.parametrize( 

20 'interval,expected', 

21 [ 

22 (None, snapshot([[1], [2], [3]])), 

23 (0, snapshot([[1], [2], [3]])), 

24 (0.02, snapshot([[1], [2], [3]])), 

25 (0.04, snapshot([[1, 2], [3]])), 

26 (0.1, snapshot([[1, 2, 3]])), 

27 ], 

28) 

29async def test_group_by_temporal(interval: float | None, expected: list[list[int]]): 

30 async def yield_groups() -> AsyncIterator[int]: 

31 yield 1 

32 await asyncio.sleep(0.02) 

33 yield 2 

34 await asyncio.sleep(0.02) 

35 yield 3 

36 await asyncio.sleep(0.02) 

37 

38 async with group_by_temporal(yield_groups(), soft_max_interval=interval) as groups_iter: 

39 groups: list[list[int]] = [g async for g in groups_iter] 

40 assert groups == expected 

41 

42 

43def test_check_object_json_schema(): 

44 object_schema = {'type': 'object', 'properties': {'a': {'type': 'string'}}} 

45 assert check_object_json_schema(object_schema) == object_schema 

46 

47 ref_schema = { 

48 '$defs': { 

49 'JsonModel': { 

50 'properties': { 

51 'type': {'title': 'Type', 'type': 'string'}, 

52 'items': {'anyOf': [{'$ref': '#/$defs/JsonModel'}, {'type': 'null'}]}, 

53 }, 

54 'required': ['type', 'items'], 

55 'title': 'JsonModel', 

56 'type': 'object', 

57 } 

58 }, 

59 '$ref': '#/$defs/JsonModel', 

60 } 

61 assert check_object_json_schema(ref_schema) == { 

62 'properties': { 

63 'type': {'title': 'Type', 'type': 'string'}, 

64 'items': {'anyOf': [{'$ref': '#/$defs/JsonModel'}, {'type': 'null'}]}, 

65 }, 

66 'required': ['type', 'items'], 

67 'title': 'JsonModel', 

68 'type': 'object', 

69 } 

70 

71 array_schema = {'type': 'array', 'items': {'type': 'string'}} 

72 with pytest.raises(UserError, match='^Schema must be an object$'): 

73 check_object_json_schema(array_schema) 

74 

75 

76@pytest.mark.parametrize('peek_first', [True, False]) 

77@pytest.mark.anyio 

78async def test_peekable_async_stream(peek_first: bool): 

79 async_stream = MockAsyncStream(iter([1, 2, 3])) 

80 peekable_async_stream = PeekableAsyncStream(async_stream) 

81 

82 items: list[int] = [] 

83 

84 # We need to both peek before starting the stream, and not, to achieve full coverage 

85 if peek_first: 

86 assert not await peekable_async_stream.is_exhausted() 

87 assert await peekable_async_stream.peek() == 1 

88 

89 async for item in peekable_async_stream: 

90 items.append(item) 

91 

92 # The next line is included mostly for the sake of achieving coverage 

93 assert await peekable_async_stream.peek() == (item + 1 if item < 3 else UNSET) 

94 

95 assert await peekable_async_stream.is_exhausted() 

96 assert await peekable_async_stream.peek() is UNSET 

97 assert items == [1, 2, 3] 

98 

99 

100def test_package_versions(capsys: pytest.CaptureFixture[str]): 

101 if os.getenv('CI'): 101 ↛ exitline 101 didn't return from function 'test_package_versions' because the condition on line 101 was always true

102 with capsys.disabled(): 

103 print('\npackage versions:') 

104 packages = sorted((package.metadata['Name'], package.version) for package in distributions()) 

105 for name, version in packages: 

106 print(f'{name:30} {version}')