Coverage for docs_src / security / tutorial004_an_py310.py: 100%
84 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
1from datetime import datetime, timedelta, timezone 1abc
2from typing import Annotated 1abc
4import jwt 1abc
5from fastapi import Depends, FastAPI, HTTPException, status 1abc
6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abc
7from jwt.exceptions import InvalidTokenError 1abc
8from pwdlib import PasswordHash 1abc
9from pydantic import BaseModel 1abc
11# to get a string like this run:
12# openssl rand -hex 32
13SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc
14ALGORITHM = "HS256" 1abc
15ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc
18fake_users_db = { 1abc
19 "johndoe": {
20 "username": "johndoe",
21 "full_name": "John Doe",
22 "email": "johndoe@example.com",
23 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
24 "disabled": False,
25 }
26}
29class Token(BaseModel): 1abc
30 access_token: str 1abc
31 token_type: str 1abc
34class TokenData(BaseModel): 1abc
35 username: str | None = None 1abc
38class User(BaseModel): 1abc
39 username: str 1abc
40 email: str | None = None 1abc
41 full_name: str | None = None 1abc
42 disabled: bool | None = None 1abc
45class UserInDB(User): 1abc
46 hashed_password: str 1abc
49password_hash = PasswordHash.recommended() 1abc
51DUMMY_HASH = password_hash.hash("dummypassword") 1abc
53oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abc
55app = FastAPI() 1abc
58def verify_password(plain_password, hashed_password): 1abc
59 return password_hash.verify(plain_password, hashed_password) 1mpydefKnqzghiLorAjklM
62def get_password_hash(password): 1abc
63 return password_hash.hash(password) 1NfOiPl
66def get_user(db, username: str): 1abc
67 if username in db: 1mpydefstnqzghiuvorAjklwx
68 user_dict = db[username] 1mpdefnqghiorjkl
69 return UserInDB(**user_dict) 1mpdefnqghiorjkl
72def authenticate_user(fake_db, username: str, password: str): 1abc
73 user = get_user(fake_db, username) 1mpydefnqzghiorAjkl
74 if not user: 1mpydefnqzghiorAjkl
75 verify_password(password, DUMMY_HASH) 1yzA
76 return False 1yzA
77 if not verify_password(password, user.hashed_password): 1mpdefnqghiorjkl
78 return False 1pqr
79 return user 1mdefnghiojkl
82def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc
83 to_encode = data.copy() 1EmdefFnghiGojkl
84 if expires_delta: 1EmdefFnghiGojkl
85 expire = datetime.now(timezone.utc) + expires_delta 1mdefnghiojkl
86 else:
87 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1EFG
88 to_encode.update({"exp": expire}) 1EmdefFnghiGojkl
89 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1EmdefFnghiGojkl
90 return encoded_jwt 1EmdefFnghiGojkl
93async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 1abc
94 credentials_exception = HTTPException( 1HdefBstIghiCuvJjklDwx
95 status_code=status.HTTP_401_UNAUTHORIZED,
96 detail="Could not validate credentials",
97 headers={"WWW-Authenticate": "Bearer"},
98 )
99 try: 1HdefBstIghiCuvJjklDwx
100 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1HdefBstIghiCuvJjklDwx
101 username = payload.get("sub") 1defBstghiCuvjklDwx
102 if username is None: 1defBstghiCuvjklDwx
103 raise credentials_exception 1BCD
104 token_data = TokenData(username=username) 1defstghiuvjklwx
105 except InvalidTokenError: 1HBICJD
106 raise credentials_exception 1HIJ
107 user = get_user(fake_users_db, username=token_data.username) 1defstghiuvjklwx
108 if user is None: 1defstghiuvjklwx
109 raise credentials_exception 1stuvwx
110 return user 1defghijkl
113async def get_current_active_user( 1abc
114 current_user: Annotated[User, Depends(get_current_user)],
115):
116 if current_user.disabled: 1defghijkl
117 raise HTTPException(status_code=400, detail="Inactive user") 1fil
118 return current_user 1deghjk
121@app.post("/token") 1abc
122async def login_for_access_token( 1abc
123 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
124) -> Token:
125 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1mpydefnqzghiorAjkl
126 if not user: 1mpydefnqzghiorAjkl
127 raise HTTPException( 1pyqzrA
128 status_code=status.HTTP_401_UNAUTHORIZED,
129 detail="Incorrect username or password",
130 headers={"WWW-Authenticate": "Bearer"},
131 )
132 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1mdefnghiojkl
133 access_token = create_access_token( 1mdefnghiojkl
134 data={"sub": user.username}, expires_delta=access_token_expires
135 )
136 return Token(access_token=access_token, token_type="bearer") 1mdefnghiojkl
139@app.get("/users/me/") 1abc
140async def read_users_me( 1abc
141 current_user: Annotated[User, Depends(get_current_active_user)],
142) -> User:
143 return current_user 1ehk
146@app.get("/users/me/items/") 1abc
147async def read_own_items( 1abc
148 current_user: Annotated[User, Depends(get_current_active_user)],
149):
150 return [{"item_id": "Foo", "owner": current_user.username}] 1dgj