Coverage for docs_src / security / tutorial004_py310.py: 100%
83 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
1from datetime import datetime, timedelta, timezone 1abc
3import jwt 1abc
4from fastapi import Depends, FastAPI, HTTPException, status 1abc
5from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abc
6from jwt.exceptions import InvalidTokenError 1abc
7from pwdlib import PasswordHash 1abc
8from pydantic import BaseModel 1abc
10# to get a string like this run:
11# openssl rand -hex 32
12SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc
13ALGORITHM = "HS256" 1abc
14ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc
17fake_users_db = { 1abc
18 "johndoe": {
19 "username": "johndoe",
20 "full_name": "John Doe",
21 "email": "johndoe@example.com",
22 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
23 "disabled": False,
24 }
25}
28class Token(BaseModel): 1abc
29 access_token: str 1abc
30 token_type: str 1abc
33class TokenData(BaseModel): 1abc
34 username: str | None = None 1abc
37class User(BaseModel): 1abc
38 username: str 1abc
39 email: str | None = None 1abc
40 full_name: str | None = None 1abc
41 disabled: bool | None = None 1abc
44class UserInDB(User): 1abc
45 hashed_password: str 1abc
48password_hash = PasswordHash.recommended() 1abc
50DUMMY_HASH = password_hash.hash("dummypassword") 1abc
52oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abc
54app = FastAPI() 1abc
57def verify_password(plain_password, hashed_password): 1abc
58 return password_hash.verify(plain_password, hashed_password) 1mpydefKnqzghiLorAjklM
61def get_password_hash(password): 1abc
62 return password_hash.hash(password) 1NfOiPl
65def get_user(db, username: str): 1abc
66 if username in db: 1mpydefstnqzghiuvorAjklwx
67 user_dict = db[username] 1mpdefnqghiorjkl
68 return UserInDB(**user_dict) 1mpdefnqghiorjkl
71def authenticate_user(fake_db, username: str, password: str): 1abc
72 user = get_user(fake_db, username) 1mpydefnqzghiorAjkl
73 if not user: 1mpydefnqzghiorAjkl
74 verify_password(password, DUMMY_HASH) 1yzA
75 return False 1yzA
76 if not verify_password(password, user.hashed_password): 1mpdefnqghiorjkl
77 return False 1pqr
78 return user 1mdefnghiojkl
81def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc
82 to_encode = data.copy() 1EmdefFnghiGojkl
83 if expires_delta: 1EmdefFnghiGojkl
84 expire = datetime.now(timezone.utc) + expires_delta 1mdefnghiojkl
85 else:
86 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1EFG
87 to_encode.update({"exp": expire}) 1EmdefFnghiGojkl
88 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1EmdefFnghiGojkl
89 return encoded_jwt 1EmdefFnghiGojkl
92async def get_current_user(token: str = Depends(oauth2_scheme)): 1abc
93 credentials_exception = HTTPException( 1HdefBstIghiCuvJjklDwx
94 status_code=status.HTTP_401_UNAUTHORIZED,
95 detail="Could not validate credentials",
96 headers={"WWW-Authenticate": "Bearer"},
97 )
98 try: 1HdefBstIghiCuvJjklDwx
99 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1HdefBstIghiCuvJjklDwx
100 username = payload.get("sub") 1defBstghiCuvjklDwx
101 if username is None: 1defBstghiCuvjklDwx
102 raise credentials_exception 1BCD
103 token_data = TokenData(username=username) 1defstghiuvjklwx
104 except InvalidTokenError: 1HBICJD
105 raise credentials_exception 1HIJ
106 user = get_user(fake_users_db, username=token_data.username) 1defstghiuvjklwx
107 if user is None: 1defstghiuvjklwx
108 raise credentials_exception 1stuvwx
109 return user 1defghijkl
112async def get_current_active_user(current_user: User = Depends(get_current_user)): 1abc
113 if current_user.disabled: 1defghijkl
114 raise HTTPException(status_code=400, detail="Inactive user") 1fil
115 return current_user 1deghjk
118@app.post("/token") 1abc
119async def login_for_access_token( 1abc
120 form_data: OAuth2PasswordRequestForm = Depends(),
121) -> Token:
122 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1mpydefnqzghiorAjkl
123 if not user: 1mpydefnqzghiorAjkl
124 raise HTTPException( 1pyqzrA
125 status_code=status.HTTP_401_UNAUTHORIZED,
126 detail="Incorrect username or password",
127 headers={"WWW-Authenticate": "Bearer"},
128 )
129 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1mdefnghiojkl
130 access_token = create_access_token( 1mdefnghiojkl
131 data={"sub": user.username}, expires_delta=access_token_expires
132 )
133 return Token(access_token=access_token, token_type="bearer") 1mdefnghiojkl
136@app.get("/users/me/") 1abc
137async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abc
138 return current_user 1ehk
141@app.get("/users/me/items/") 1abc
142async def read_own_items(current_user: User = Depends(get_current_active_user)): 1abc
143 return [{"item_id": "Foo", "owner": current_user.username}] 1dgj