Coverage for docs_src/security/tutorial005_py310.py: 100%
92 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-05 00:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-05 00:03 +0000
1from datetime import datetime, timedelta, timezone 1abcd
3import jwt 1abcd
4from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcd
5from fastapi.security import ( 1abcd
6 OAuth2PasswordBearer,
7 OAuth2PasswordRequestForm,
8 SecurityScopes,
9)
10from jwt.exceptions import InvalidTokenError 1abcd
11from passlib.context import CryptContext 1abcd
12from pydantic import BaseModel, ValidationError 1abcd
14# to get a string like this run:
15# openssl rand -hex 32
16SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcd
17ALGORITHM = "HS256" 1abcd
18ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcd
21fake_users_db = { 1abcd
22 "johndoe": {
23 "username": "johndoe",
24 "full_name": "John Doe",
25 "email": "johndoe@example.com",
26 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
27 "disabled": False,
28 },
29 "alice": {
30 "username": "alice",
31 "full_name": "Alice Chains",
32 "email": "alicechains@example.com",
33 "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
34 "disabled": True,
35 },
36}
39class Token(BaseModel): 1abcd
40 access_token: str 1abcd
41 token_type: str 1abcd
44class TokenData(BaseModel): 1abcd
45 username: str | None = None 1abcd
46 scopes: list[str] = [] 1abcd
49class User(BaseModel): 1abcd
50 username: str 1abcd
51 email: str | None = None 1abcd
52 full_name: str | None = None 1abcd
53 disabled: bool | None = None 1abcd
56class UserInDB(User): 1abcd
57 hashed_password: str 1abcd
60pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1abcd
62oauth2_scheme = OAuth2PasswordBearer( 1abcd
63 tokenUrl="token",
64 scopes={"me": "Read information about the current user.", "items": "Read items."},
65)
67app = FastAPI() 1abcd
70def verify_password(plain_password, hashed_password): 1abcd
71 return pwd_context.verify(plain_password, hashed_password) 1yKeqfmr4zLgshnt5AMiujov6BNkwlpx7
74def get_password_hash(password): 1abcd
75 return pwd_context.hash(password) 189!#
78def get_user(db, username: str): 1abcd
79 if username in db: 1yKSeqfmrCDzLTgshntEFAMUiujovGHBNVkwlpxIJ
80 user_dict = db[username] 1yKeqfmrzLgshntAMiujovBNkwlpx
81 return UserInDB(**user_dict) 1yKeqfmrzLgshntAMiujovBNkwlpx
84def authenticate_user(fake_db, username: str, password: str): 1abcd
85 user = get_user(fake_db, username) 1yKSeqfmrzLTgshntAMUiujovBNVkwlpx
86 if not user: 1yKSeqfmrzLTgshntAMUiujovBNVkwlpx
87 return False 1STUV
88 if not verify_password(password, user.hashed_password): 1yKeqfmrzLgshntAMiujovBNkwlpx
89 return False 1KLMN
90 return user 1yeqfmrzgshntAiujovBkwlpx
93def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abcd
94 to_encode = data.copy() 10yeqfmr1zgshnt2Aiujov3Bkwlpx
95 if expires_delta: 10yeqfmr1zgshnt2Aiujov3Bkwlpx
96 expire = datetime.now(timezone.utc) + expires_delta 1yeqfmrzgshntAiujovBkwlpx
97 else:
98 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 10123
99 to_encode.update({"exp": expire}) 10yeqfmr1zgshnt2Aiujov3Bkwlpx
100 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 10yeqfmr1zgshnt2Aiujov3Bkwlpx
101 return encoded_jwt 10yeqfmr1zgshnt2Aiujov3Bkwlpx
104async def get_current_user( 1abcd
105 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
106):
107 if security_scopes.scopes: 1WeqfmrOCDXgshntPEFYiujovQGHZkwlpxRIJ
108 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1WefmrOCDXghntPEFYijovQGHZklpxRIJ
109 else:
110 authenticate_value = "Bearer" 1qsuw
111 credentials_exception = HTTPException( 1WeqfmrOCDXgshntPEFYiujovQGHZkwlpxRIJ
112 status_code=status.HTTP_401_UNAUTHORIZED,
113 detail="Could not validate credentials",
114 headers={"WWW-Authenticate": authenticate_value},
115 )
116 try: 1WeqfmrOCDXgshntPEFYiujovQGHZkwlpxRIJ
117 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1WeqfmrOCDXgshntPEFYiujovQGHZkwlpxRIJ
118 username: str = payload.get("sub") 1eqfmrOCDgshntPEFiujovQGHkwlpxRIJ
119 if username is None: 1eqfmrOCDgshntPEFiujovQGHkwlpxRIJ
120 raise credentials_exception 1OPQR
121 token_scopes = payload.get("scopes", []) 1eqfmrCDgshntEFiujovGHkwlpxIJ
122 token_data = TokenData(scopes=token_scopes, username=username) 1eqfmrCDgshntEFiujovGHkwlpxIJ
123 except (InvalidTokenError, ValidationError): 1WOXPYQZR
124 raise credentials_exception 1WXYZ
125 user = get_user(fake_users_db, username=token_data.username) 1eqfmrCDgshntEFiujovGHkwlpxIJ
126 if user is None: 1eqfmrCDgshntEFiujovGHkwlpxIJ
127 raise credentials_exception 1CDEFGHIJ
128 for scope in security_scopes.scopes: 1eqfmrgshntiujovkwlpx
129 if scope not in token_data.scopes: 1efmrghntijovklpx
130 raise HTTPException( 1rtvx
131 status_code=status.HTTP_401_UNAUTHORIZED,
132 detail="Not enough permissions",
133 headers={"WWW-Authenticate": authenticate_value},
134 )
135 return user 1eqfmgshniujokwlp
138async def get_current_active_user( 1abcd
139 current_user: User = Security(get_current_user, scopes=["me"]),
140):
141 if current_user.disabled: 1efmghnijoklp
142 raise HTTPException(status_code=400, detail="Inactive user") 1mnop
143 return current_user 1efghijkl
146@app.post("/token") 1abcd
147async def login_for_access_token( 1abcd
148 form_data: OAuth2PasswordRequestForm = Depends(),
149) -> Token:
150 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1yKSeqfmrzLTgshntAMUiujovBNVkwlpx
151 if not user: 1yKSeqfmrzLTgshntAMUiujovBNVkwlpx
152 raise HTTPException(status_code=400, detail="Incorrect username or password") 1KSLTMUNV
153 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1yeqfmrzgshntAiujovBkwlpx
154 access_token = create_access_token( 1yeqfmrzgshntAiujovBkwlpx
155 data={"sub": user.username, "scopes": form_data.scopes},
156 expires_delta=access_token_expires,
157 )
158 return Token(access_token=access_token, token_type="bearer") 1yeqfmrzgshntAiujovBkwlpx
161@app.get("/users/me/", response_model=User) 1abcd
162async def read_users_me(current_user: User = Depends(get_current_active_user)): 1abcd
163 return current_user 1fhjl
166@app.get("/users/me/items/") 1abcd
167async def read_own_items( 1abcd
168 current_user: User = Security(get_current_active_user, scopes=["items"]),
169):
170 return [{"item_id": "Foo", "owner": current_user.username}] 1egik
173@app.get("/status/") 1abcd
174async def read_system_status(current_user: User = Depends(get_current_user)): 1abcd
175 return {"status": "ok"} 1qsuw