Coverage for docs_src/security/tutorial005.py: 100%
93 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-13 13:38 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-13 13:38 +0000
1from datetime import datetime, timedelta, timezone 1abcde
2from typing import List, Union 1abcde
4import jwt 1abcde
5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcde
6from fastapi.security import ( 1abcde
7 OAuth2PasswordBearer,
8 OAuth2PasswordRequestForm,
9 SecurityScopes,
10)
11from jwt.exceptions import InvalidTokenError 1abcde
12from passlib.context import CryptContext 1abcde
13from pydantic import BaseModel, ValidationError 1abcde
15# to get a string like this run:
16# openssl rand -hex 32
17SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcde
18ALGORITHM = "HS256" 1abcde
19ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcde
22fake_users_db = { 1abcde
23 "johndoe": {
24 "username": "johndoe",
25 "full_name": "John Doe",
26 "email": "johndoe@example.com",
27 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
28 "disabled": False,
29 },
30 "alice": {
31 "username": "alice",
32 "full_name": "Alice Chains",
33 "email": "alicechains@example.com",
34 "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
35 "disabled": True,
36 },
37}
40class Token(BaseModel): 1abcde
41 access_token: str 1abcde
42 token_type: str 1abcde
45class TokenData(BaseModel): 1abcde
46 username: Union[str, None] = None 1abcde
47 scopes: List[str] = [] 1abcde
50class User(BaseModel): 1abcde
51 username: str 1abcde
52 email: Union[str, None] = None 1abcde
53 full_name: Union[str, None] = None 1abcde
54 disabled: Union[bool, None] = None 1abcde
57class UserInDB(User): 1abcde
58 hashed_password: str 1abcde
61pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1abcde
63oauth2_scheme = OAuth2PasswordBearer( 1abcde
64 tokenUrl="token",
65 scopes={"me": "Read information about the current user.", "items": "Read items."},
66)
68app = FastAPI() 1abcde
71def verify_password(plain_password, hashed_password): 1abcde
72 return pwd_context.verify(plain_password, hashed_password) 1ETfugpv+FUhwiqx,GVjykrz-HWlAmsB.IXnCotD/
75def get_password_hash(password): 1abcde
76 return pwd_context.hash(password) 1:;=?@
79def get_user(db, username: str): 1abcde
80 if username in db: 1ET3fugpvJKFU4hwiqxLMGV5jykrzNOHW6lAmsBPQIX7nCotDRS
81 user_dict = db[username] 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD
82 return UserInDB(**user_dict) 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD
85def authenticate_user(fake_db, username: str, password: str): 1abcde
86 user = get_user(fake_db, username) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD
87 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD
88 return False 134567
89 if not verify_password(password, user.hashed_password): 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD
90 return False 1TUVWX
91 return user 1EfugpvFhwiqxGjykrzHlAmsBInCotD
94def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): 1abcde
95 to_encode = data.copy() 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD
96 if expires_delta: 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD
97 expire = datetime.now(timezone.utc) + expires_delta 1EfugpvFhwiqxGjykrzHlAmsBInCotD
98 else:
99 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1%'()*
100 to_encode.update({"exp": expire}) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD
101 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD
102 return encoded_jwt 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD
105async def get_current_user( 1abcde
106 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
107):
108 if security_scopes.scopes: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS
109 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 18fgpvYJK9hiqxZLM!jkrz0NO#lmsB1PQ$notD2RS
110 else:
111 authenticate_value = "Bearer" 1uwyAC
112 credentials_exception = HTTPException( 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS
113 status_code=status.HTTP_401_UNAUTHORIZED,
114 detail="Could not validate credentials",
115 headers={"WWW-Authenticate": authenticate_value},
116 )
117 try: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS
118 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS
119 username: str = payload.get("sub") 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS
120 if username is None: 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS
121 raise credentials_exception 1YZ012
122 token_scopes = payload.get("scopes", []) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS
123 token_data = TokenData(scopes=token_scopes, username=username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS
124 except (InvalidTokenError, ValidationError): 18Y9Z!0#1$2
125 raise credentials_exception 189!#$
126 user = get_user(fake_users_db, username=token_data.username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS
127 if user is None: 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS
128 raise credentials_exception 1JKLMNOPQRS
129 for scope in security_scopes.scopes: 1fugpvhwiqxjykrzlAmsBnCotD
130 if scope not in token_data.scopes: 1fgpvhiqxjkrzlmsBnotD
131 raise HTTPException( 1vxzBD
132 status_code=status.HTTP_401_UNAUTHORIZED,
133 detail="Not enough permissions",
134 headers={"WWW-Authenticate": authenticate_value},
135 )
136 return user 1fugphwiqjykrlAmsnCot
139async def get_current_active_user( 1abcde
140 current_user: User = Security(get_current_user, scopes=["me"]),
141):
142 if current_user.disabled: 1fgphiqjkrlmsnot
143 raise HTTPException(status_code=400, detail="Inactive user") 1pqrst
144 return current_user 1fghijklmno
147@app.post("/token") 1abcde
148async def login_for_access_token( 1abcde
149 form_data: OAuth2PasswordRequestForm = Depends(),
150) -> Token:
151 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD
152 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD
153 raise HTTPException(status_code=400, detail="Incorrect username or password") 1T3U4V5W6X7
154 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1EfugpvFhwiqxGjykrzHlAmsBInCotD
155 access_token = create_access_token( 1EfugpvFhwiqxGjykrzHlAmsBInCotD
156 data={"sub": user.username, "scopes": form_data.scopes},
157 expires_delta=access_token_expires,
158 )
159 return Token(access_token=access_token, token_type="bearer") 1EfugpvFhwiqxGjykrzHlAmsBInCotD
162@app.get("/users/me/", response_model=User) 1abcde
163async def read_users_me(current_user: User = Depends(get_current_active_user)): 1abcde
164 return current_user 1gikmo
167@app.get("/users/me/items/") 1abcde
168async def read_own_items( 1abcde
169 current_user: User = Security(get_current_active_user, scopes=["items"]),
170):
171 return [{"item_id": "Foo", "owner": current_user.username}] 1fhjln
174@app.get("/status/") 1abcde
175async def read_system_status(current_user: User = Depends(get_current_user)): 1abcde
176 return {"status": "ok"} 1uwyAC