Coverage for tests / test_dependency_after_yield_streaming.py: 100%

80 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-12 18:15 +0000

1from collections.abc import Generator 1abcd

2from contextlib import contextmanager 1abcd

3from typing import Annotated, Any 1abcd

4 

5import pytest 1abcd

6from fastapi import Depends, FastAPI 1abcd

7from fastapi.responses import StreamingResponse 1abcd

8from fastapi.testclient import TestClient 1abcd

9 

10 

11class Session: 1abcd

12 def __init__(self) -> None: 1abcd

13 self.data = ["foo", "bar", "baz"] 1qheirnwsjfktoxulgmvpy

14 self.open = True 1qheirnwsjfktoxulgmvpy

15 

16 def __iter__(self) -> Generator[str, None, None]: 1abcd

17 for item in self.data: 1qheirnsjfktoulgmvp

18 if self.open: 1qheirnsjfktoulgmvp

19 yield item 1rntovp

20 else: 

21 raise ValueError("Session closed") 1qheisjfkulgm

22 

23 

24@contextmanager 1abcd

25def acquire_session() -> Generator[Session, None, None]: 1abcd

26 session = Session() 1qheirnwsjfktoxulgmvpy

27 try: 1qheirnwsjfktoxulgmvpy

28 yield session 1qheirnwsjfktoxulgmvpy

29 finally: 

30 session.open = False 1qheirnwsjfktoxulgmvpy

31 

32 

33def dep_session() -> Any: 1abcd

34 with acquire_session() as s: 1rnwtoxvpy

35 yield s 1rnwtoxvpy

36 

37 

38def broken_dep_session() -> Any: 1abcd

39 with acquire_session() as s: 1qheisjfkulgm

40 s.open = False 1qheisjfkulgm

41 yield s 1qheisjfkulgm

42 

43 

44SessionDep = Annotated[Session, Depends(dep_session)] 1abcd

45BrokenSessionDep = Annotated[Session, Depends(broken_dep_session)] 1abcd

46 

47app = FastAPI() 1abcd

48 

49 

50@app.get("/data") 1abcd

51def get_data(session: SessionDep) -> Any: 1abcd

52 data = list(session) 1rtv

53 return data 1rtv

54 

55 

56@app.get("/stream-simple") 1abcd

57def get_stream_simple(session: SessionDep) -> Any: 1abcd

58 def iter_data(): 1wxy

59 yield from ["x", "y", "z"] 1wxy

60 

61 return StreamingResponse(iter_data()) 1wxy

62 

63 

64@app.get("/stream-session") 1abcd

65def get_stream_session(session: SessionDep) -> Any: 1abcd

66 def iter_data(): 1nop

67 yield from session 1nop

68 

69 return StreamingResponse(iter_data()) 1nop

70 

71 

72@app.get("/broken-session-data") 1abcd

73def get_broken_session_data(session: BrokenSessionDep) -> Any: 1abcd

74 return list(session) 1qhsjul

75 

76 

77@app.get("/broken-session-stream") 1abcd

78def get_broken_session_stream(session: BrokenSessionDep) -> Any: 1abcd

79 def iter_data(): 1eifkgm

80 yield from session 1eifkgm

81 

82 return StreamingResponse(iter_data()) 1eifkgm

83 

84 

85client = TestClient(app) 1abcd

86 

87 

88def test_regular_no_stream(): 1abcd

89 response = client.get("/data") 1rtv

90 assert response.json() == ["foo", "bar", "baz"] 1rtv

91 

92 

93def test_stream_simple(): 1abcd

94 response = client.get("/stream-simple") 1wxy

95 assert response.text == "xyz" 1wxy

96 

97 

98def test_stream_session(): 1abcd

99 response = client.get("/stream-session") 1nop

100 assert response.text == "foobarbaz" 1nop

101 

102 

103def test_broken_session_data(): 1abcd

104 with pytest.raises(ValueError, match="Session closed"): 1qsu

105 client.get("/broken-session-data") 1qsu

106 

107 

108def test_broken_session_data_no_raise(): 1abcd

109 client = TestClient(app, raise_server_exceptions=False) 1hjl

110 response = client.get("/broken-session-data") 1hjl

111 assert response.status_code == 500 1hjl

112 assert response.text == "Internal Server Error" 1hjl

113 

114 

115def test_broken_session_stream_raise(): 1abcd

116 # Can raise ValueError on Pydantic v2 and ExceptionGroup on Pydantic v1 

117 with pytest.raises((ValueError, Exception)): 1ikm

118 client.get("/broken-session-stream") 1ikm

119 

120 

121def test_broken_session_stream_no_raise(): 1abcd

122 """ 

123 When a dependency with yield raises after the streaming response already started 

124 the 200 status code is already sent, but there's still an error in the server 

125 afterwards, an exception is raised and captured or shown in the server logs. 

126 """ 

127 with TestClient(app, raise_server_exceptions=False) as client: 1efg

128 response = client.get("/broken-session-stream") 1efg

129 assert response.status_code == 200 1efg

130 assert response.text == "" 1efg