Coverage for docs_src / security / tutorial005_py310.py: 100%
95 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-12 18:15 +0000
1from datetime import datetime, timedelta, timezone 1abc
3import jwt 1abc
4from fastapi import Depends, FastAPI, HTTPException, Security, status 1abc
5from fastapi.security import ( 1abc
6 OAuth2PasswordBearer,
7 OAuth2PasswordRequestForm,
8 SecurityScopes,
9)
10from jwt.exceptions import InvalidTokenError 1abc
11from pwdlib import PasswordHash 1abc
12from pydantic import BaseModel, ValidationError 1abc
14# to get a string like this run:
15# openssl rand -hex 32
16SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc
17ALGORITHM = "HS256" 1abc
18ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc
21fake_users_db = { 1abc
22 "johndoe": {
23 "username": "johndoe",
24 "full_name": "John Doe",
25 "email": "johndoe@example.com",
26 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
27 "disabled": False,
28 },
29 "alice": {
30 "username": "alice",
31 "full_name": "Alice Chains",
32 "email": "alicechains@example.com",
33 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE",
34 "disabled": True,
35 },
36}
39class Token(BaseModel): 1abc
40 access_token: str 1abc
41 token_type: str 1abc
44class TokenData(BaseModel): 1abc
45 username: str | None = None 1abc
46 scopes: list[str] = [] 1abc
49class User(BaseModel): 1abc
50 username: str 1abc
51 email: str | None = None 1abc
52 full_name: str | None = None 1abc
53 disabled: bool | None = None 1abc
56class UserInDB(User): 1abc
57 hashed_password: str 1abc
60password_hash = PasswordHash.recommended() 1abc
62DUMMY_HASH = password_hash.hash("dummypassword") 1abc
64oauth2_scheme = OAuth2PasswordBearer( 1abc
65 tokenUrl="token",
66 scopes={"me": "Read information about the current user.", "items": "Read items."},
67)
69app = FastAPI() 1abc
72def verify_password(plain_password, hashed_password): 1abc
73 return password_hash.verify(plain_password, hashed_password) 1sBEdmejnQtCFfogkpRuDGhqilrS
76def get_password_hash(password): 1abc
77 return password_hash.hash(password) 1TUV
80def get_user(db, username: str): 1abc
81 if username in db: 1sBEdmejnvwtCFfogkpxyuDGhqilrzA
82 user_dict = db[username] 1sBdmejntCfogkpuDhqilr
83 return UserInDB(**user_dict) 1sBdmejntCfogkpuDhqilr
86def authenticate_user(fake_db, username: str, password: str): 1abc
87 user = get_user(fake_db, username) 1sBEdmejntCFfogkpuDGhqilr
88 if not user: 1sBEdmejntCFfogkpuDGhqilr
89 verify_password(password, DUMMY_HASH) 1EFG
90 return False 1EFG
91 if not verify_password(password, user.hashed_password): 1sBdmejntCfogkpuDhqilr
92 return False 1BCD
93 return user 1sdmejntfogkpuhqilr
96def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc
97 to_encode = data.copy() 1NsdmejnOtfogkpPuhqilr
98 if expires_delta: 1NsdmejnOtfogkpPuhqilr
99 expire = datetime.now(timezone.utc) + expires_delta 1sdmejntfogkpuhqilr
100 else:
101 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1NOP
102 to_encode.update({"exp": expire}) 1NsdmejnOtfogkpPuhqilr
103 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NsdmejnOtfogkpPuhqilr
104 return encoded_jwt 1NsdmejnOtfogkpPuhqilr
107async def get_current_user( 1abc
108 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
109):
110 if security_scopes.scopes: 1KdmejnHvwLfogkpIxyMhqilrJzA
111 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1KdejnHvwLfgkpIxyMhilrJzA
112 else:
113 authenticate_value = "Bearer" 1moq
114 credentials_exception = HTTPException( 1KdmejnHvwLfogkpIxyMhqilrJzA
115 status_code=status.HTTP_401_UNAUTHORIZED,
116 detail="Could not validate credentials",
117 headers={"WWW-Authenticate": authenticate_value},
118 )
119 try: 1KdmejnHvwLfogkpIxyMhqilrJzA
120 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1KdmejnHvwLfogkpIxyMhqilrJzA
121 username: str = payload.get("sub") 1dmejnHvwfogkpIxyhqilrJzA
122 if username is None: 1dmejnHvwfogkpIxyhqilrJzA
123 raise credentials_exception 1HIJ
124 scope: str = payload.get("scope", "") 1dmejnvwfogkpxyhqilrzA
125 token_scopes = scope.split(" ") 1dmejnvwfogkpxyhqilrzA
126 token_data = TokenData(scopes=token_scopes, username=username) 1dmejnvwfogkpxyhqilrzA
127 except (InvalidTokenError, ValidationError): 1KHLIMJ
128 raise credentials_exception 1KLM
129 user = get_user(fake_users_db, username=token_data.username) 1dmejnvwfogkpxyhqilrzA
130 if user is None: 1dmejnvwfogkpxyhqilrzA
131 raise credentials_exception 1vwxyzA
132 for scope in security_scopes.scopes: 1dmejnfogkphqilr
133 if scope not in token_data.scopes: 1dejnfgkphilr
134 raise HTTPException( 1npr
135 status_code=status.HTTP_401_UNAUTHORIZED,
136 detail="Not enough permissions",
137 headers={"WWW-Authenticate": authenticate_value},
138 )
139 return user 1dmejfogkhqil
142async def get_current_active_user( 1abc
143 current_user: User = Security(get_current_user, scopes=["me"]),
144):
145 if current_user.disabled: 1dejfgkhil
146 raise HTTPException(status_code=400, detail="Inactive user") 1jkl
147 return current_user 1defghi
150@app.post("/token") 1abc
151async def login_for_access_token( 1abc
152 form_data: OAuth2PasswordRequestForm = Depends(),
153) -> Token:
154 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1sBEdmejntCFfogkpuDGhqilr
155 if not user: 1sBEdmejntCFfogkpuDGhqilr
156 raise HTTPException(status_code=400, detail="Incorrect username or password") 1BECFDG
157 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1sdmejntfogkpuhqilr
158 access_token = create_access_token( 1sdmejntfogkpuhqilr
159 data={"sub": user.username, "scope": " ".join(form_data.scopes)},
160 expires_delta=access_token_expires,
161 )
162 return Token(access_token=access_token, token_type="bearer") 1sdmejntfogkpuhqilr
165@app.get("/users/me/") 1abc
166async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abc
167 return current_user 1egi
170@app.get("/users/me/items/") 1abc
171async def read_own_items( 1abc
172 current_user: User = Security(get_current_active_user, scopes=["items"]),
173):
174 return [{"item_id": "Foo", "owner": current_user.username}] 1dfh
177@app.get("/status/") 1abc
178async def read_system_status(current_user: User = Depends(get_current_user)): 1abc
179 return {"status": "ok"} 1moq