Coverage for docs_src/security/tutorial005_an.py: 100%
94 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-05 00:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-05 00:03 +0000
1from datetime import datetime, timedelta, timezone 1abcdef
2from typing import List, Union 1abcdef
4import jwt 1abcdef
5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcdef
6from fastapi.security import ( 1abcdef
7 OAuth2PasswordBearer,
8 OAuth2PasswordRequestForm,
9 SecurityScopes,
10)
11from jwt.exceptions import InvalidTokenError 1abcdef
12from passlib.context import CryptContext 1abcdef
13from pydantic import BaseModel, ValidationError 1abcdef
14from typing_extensions import Annotated 1abcdef
16# to get a string like this run:
17# openssl rand -hex 32
18SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcdef
19ALGORITHM = "HS256" 1abcdef
20ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcdef
23fake_users_db = { 1abcdef
24 "johndoe": {
25 "username": "johndoe",
26 "full_name": "John Doe",
27 "email": "johndoe@example.com",
28 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
29 "disabled": False,
30 },
31 "alice": {
32 "username": "alice",
33 "full_name": "Alice Chains",
34 "email": "alicechains@example.com",
35 "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
36 "disabled": True,
37 },
38}
41class Token(BaseModel): 1abcdef
42 access_token: str 1abcdef
43 token_type: str 1abcdef
46class TokenData(BaseModel): 1abcdef
47 username: Union[str, None] = None 1abcdef
48 scopes: List[str] = [] 1abcdef
51class User(BaseModel): 1abcdef
52 username: str 1abcdef
53 email: Union[str, None] = None 1abcdef
54 full_name: Union[str, None] = None 1abcdef
55 disabled: Union[bool, None] = None 1abcdef
58class UserInDB(User): 1abcdef
59 hashed_password: str 1abcdef
62pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1abcdef
64oauth2_scheme = OAuth2PasswordBearer( 1abcdef
65 tokenUrl="token",
66 scopes={"me": "Read information about the current user.", "items": "Read items."},
67)
69app = FastAPI() 1abcdef
72def verify_password(plain_password, hashed_password): 1abcdef
73 return pwd_context.verify(plain_password, hashed_password) 2K 2 g y h s z ` L 3 i A j t B { M 4 k C l u D | N 5 m E n v F } O 6 o G p w H ~ P 7 q I r x J ab
76def get_password_hash(password): 1abcdef
77 return pwd_context.hash(password) 2bbcbdbebfbgb
80def get_user(db, username: str): 1abcdef
81 if username in db: 1K2'gyhszQRL3(iAjtBSTM4)kCluDUVN5*mEnvFWXO6+oGpwHYZP7,qIrxJ01
82 user_dict = db[username] 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ
83 return UserInDB(**user_dict) 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ
86def authenticate_user(fake_db, username: str, password: str): 1abcdef
87 user = get_user(fake_db, username) 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ
88 if not user: 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ
89 return False 1'()*+,
90 if not verify_password(password, user.hashed_password): 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ
91 return False 1234567
92 return user 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ
95def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): 1abcdef
96 to_encode = data.copy() 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ
97 if expires_delta: 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ
98 expire = datetime.now(timezone.utc) + expires_delta 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ
99 else:
100 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1?@[]^_
101 to_encode.update({"exp": expire}) 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ
102 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ
103 return encoded_jwt 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ
106async def get_current_user( 1abcdef
107 security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]
108):
109 if security_scopes.scopes: 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01
110 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1-ghsz8QR.ijtB9ST/kluD!UV:mnvF#WX;opwH$YZ=qrxJ%01
111 else:
112 authenticate_value = "Bearer" 1yACEGI
113 credentials_exception = HTTPException( 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01
114 status_code=status.HTTP_401_UNAUTHORIZED,
115 detail="Could not validate credentials",
116 headers={"WWW-Authenticate": authenticate_value},
117 )
118 try: 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01
119 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01
120 username = payload.get("sub") 1gyhsz8QRiAjtB9STkCluD!UVmEnvF#WXoGpwH$YZqIrxJ%01
121 if username is None: 1gyhsz8QRiAjtB9STkCluD!UVmEnvF#WXoGpwH$YZqIrxJ%01
122 raise credentials_exception 189!#$%
123 token_scopes = payload.get("scopes", []) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01
124 token_data = TokenData(scopes=token_scopes, username=username) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01
125 except (InvalidTokenError, ValidationError): 1-8.9/!:#;$=%
126 raise credentials_exception 1-./:;=
127 user = get_user(fake_users_db, username=token_data.username) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01
128 if user is None: 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01
129 raise credentials_exception 1QRSTUVWXYZ01
130 for scope in security_scopes.scopes: 1gyhsziAjtBkCluDmEnvFoGpwHqIrxJ
131 if scope not in token_data.scopes: 1ghszijtBkluDmnvFopwHqrxJ
132 raise HTTPException( 1zBDFHJ
133 status_code=status.HTTP_401_UNAUTHORIZED,
134 detail="Not enough permissions",
135 headers={"WWW-Authenticate": authenticate_value},
136 )
137 return user 1gyhsiAjtkClumEnvoGpwqIrx
140async def get_current_active_user( 1abcdef
141 current_user: Annotated[User, Security(get_current_user, scopes=["me"])],
142):
143 if current_user.disabled: 1ghsijtklumnvopwqrx
144 raise HTTPException(status_code=400, detail="Inactive user") 1stuvwx
145 return current_user 1ghijklmnopqr
148@app.post("/token") 1abcdef
149async def login_for_access_token( 1abcdef
150 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
151) -> Token:
152 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ
153 if not user: 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ
154 raise HTTPException(status_code=400, detail="Incorrect username or password") 12'3(4)5*6+7,
155 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ
156 access_token = create_access_token( 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ
157 data={"sub": user.username, "scopes": form_data.scopes},
158 expires_delta=access_token_expires,
159 )
160 return Token(access_token=access_token, token_type="bearer") 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ
163@app.get("/users/me/", response_model=User) 1abcdef
164async def read_users_me( 1abcdef
165 current_user: Annotated[User, Depends(get_current_active_user)],
166):
167 return current_user 1hjlnpr
170@app.get("/users/me/items/") 1abcdef
171async def read_own_items( 1abcdef
172 current_user: Annotated[User, Security(get_current_active_user, scopes=["items"])],
173):
174 return [{"item_id": "Foo", "owner": current_user.username}] 1gikmoq
177@app.get("/status/") 1abcdef
178async def read_system_status(current_user: Annotated[User, Depends(get_current_user)]): 1abcdef
179 return {"status": "ok"} 1yACEGI