Coverage for tests / test_dependency_after_yield_streaming.py: 100%
80 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
1from collections.abc import Generator 1abcd
2from contextlib import contextmanager 1abcd
3from typing import Annotated, Any 1abcd
5import pytest 1abcd
6from fastapi import Depends, FastAPI 1abcd
7from fastapi.responses import StreamingResponse 1abcd
8from fastapi.testclient import TestClient 1abcd
11class Session: 1abcd
12 def __init__(self) -> None: 1abcd
13 self.data = ["foo", "bar", "baz"] 1qheirnwsjfktoxulgmvpy
14 self.open = True 1qheirnwsjfktoxulgmvpy
16 def __iter__(self) -> Generator[str, None, None]: 1abcd
17 for item in self.data: 1qheirnsjfktoulgmvp
18 if self.open: 1qheirnsjfktoulgmvp
19 yield item 1rntovp
20 else:
21 raise ValueError("Session closed") 1qheisjfkulgm
24@contextmanager 1abcd
25def acquire_session() -> Generator[Session, None, None]: 1abcd
26 session = Session() 1qheirnwsjfktoxulgmvpy
27 try: 1qheirnwsjfktoxulgmvpy
28 yield session 1qheirnwsjfktoxulgmvpy
29 finally:
30 session.open = False 1qheirnwsjfktoxulgmvpy
33def dep_session() -> Any: 1abcd
34 with acquire_session() as s: 1rnwtoxvpy
35 yield s 1rnwtoxvpy
38def broken_dep_session() -> Any: 1abcd
39 with acquire_session() as s: 1qheisjfkulgm
40 s.open = False 1qheisjfkulgm
41 yield s 1qheisjfkulgm
44SessionDep = Annotated[Session, Depends(dep_session)] 1abcd
45BrokenSessionDep = Annotated[Session, Depends(broken_dep_session)] 1abcd
47app = FastAPI() 1abcd
50@app.get("/data") 1abcd
51def get_data(session: SessionDep) -> Any: 1abcd
52 data = list(session) 1rtv
53 return data 1rtv
56@app.get("/stream-simple") 1abcd
57def get_stream_simple(session: SessionDep) -> Any: 1abcd
58 def iter_data(): 1wxy
59 yield from ["x", "y", "z"] 1wxy
61 return StreamingResponse(iter_data()) 1wxy
64@app.get("/stream-session") 1abcd
65def get_stream_session(session: SessionDep) -> Any: 1abcd
66 def iter_data(): 1nop
67 yield from session 1nop
69 return StreamingResponse(iter_data()) 1nop
72@app.get("/broken-session-data") 1abcd
73def get_broken_session_data(session: BrokenSessionDep) -> Any: 1abcd
74 return list(session) 1qhsjul
77@app.get("/broken-session-stream") 1abcd
78def get_broken_session_stream(session: BrokenSessionDep) -> Any: 1abcd
79 def iter_data(): 1eifkgm
80 yield from session 1eifkgm
82 return StreamingResponse(iter_data()) 1eifkgm
85client = TestClient(app) 1abcd
88def test_regular_no_stream(): 1abcd
89 response = client.get("/data") 1rtv
90 assert response.json() == ["foo", "bar", "baz"] 1rtv
93def test_stream_simple(): 1abcd
94 response = client.get("/stream-simple") 1wxy
95 assert response.text == "xyz" 1wxy
98def test_stream_session(): 1abcd
99 response = client.get("/stream-session") 1nop
100 assert response.text == "foobarbaz" 1nop
103def test_broken_session_data(): 1abcd
104 with pytest.raises(ValueError, match="Session closed"): 1qsu
105 client.get("/broken-session-data") 1qsu
108def test_broken_session_data_no_raise(): 1abcd
109 client = TestClient(app, raise_server_exceptions=False) 1hjl
110 response = client.get("/broken-session-data") 1hjl
111 assert response.status_code == 500 1hjl
112 assert response.text == "Internal Server Error" 1hjl
115def test_broken_session_stream_raise(): 1abcd
116 # Can raise ValueError on Pydantic v2 and ExceptionGroup on Pydantic v1
117 with pytest.raises((ValueError, Exception)): 1ikm
118 client.get("/broken-session-stream") 1ikm
121def test_broken_session_stream_no_raise(): 1abcd
122 """
123 When a dependency with yield raises after the streaming response already started
124 the 200 status code is already sent, but there's still an error in the server
125 afterwards, an exception is raised and captured or shown in the server logs.
126 """
127 with TestClient(app, raise_server_exceptions=False) as client: 1efg
128 response = client.get("/broken-session-stream") 1efg
129 assert response.status_code == 200 1efg
130 assert response.text == "" 1efg