Coverage for docs_src/security/tutorial005_an.py: 100%
95 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-12-04 08:29 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-12-04 08:29 +0000
1from datetime import datetime, timedelta, timezone 1abcdefg
2from typing import List, Union 1abcdefg
4import jwt 1abcdefg
5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcdefg
6from fastapi.security import ( 1abcdefg
7 OAuth2PasswordBearer,
8 OAuth2PasswordRequestForm,
9 SecurityScopes,
10)
11from jwt.exceptions import InvalidTokenError 1abcdefg
12from pwdlib import PasswordHash 1abcdefg
13from pydantic import BaseModel, ValidationError 1abcdefg
14from typing_extensions import Annotated 1abcdefg
16# to get a string like this run:
17# openssl rand -hex 32
18SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcdefg
19ALGORITHM = "HS256" 1abcdefg
20ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcdefg
23fake_users_db = { 1abcdefg
24 "johndoe": {
25 "username": "johndoe",
26 "full_name": "John Doe",
27 "email": "johndoe@example.com",
28 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc",
29 "disabled": False,
30 },
31 "alice": {
32 "username": "alice",
33 "full_name": "Alice Chains",
34 "email": "alicechains@example.com",
35 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE",
36 "disabled": True,
37 },
38}
41class Token(BaseModel): 1abcdefg
42 access_token: str 1abcdefg
43 token_type: str 1abcdefg
46class TokenData(BaseModel): 1abcdefg
47 username: Union[str, None] = None 1abcdefg
48 scopes: List[str] = [] 1abcdefg
51class User(BaseModel): 1abcdefg
52 username: str 1abcdefg
53 email: Union[str, None] = None 1abcdefg
54 full_name: Union[str, None] = None 1abcdefg
55 disabled: Union[bool, None] = None 1abcdefg
58class UserInDB(User): 1abcdefg
59 hashed_password: str 1abcdefg
62password_hash = PasswordHash.recommended() 1abcdefg
64oauth2_scheme = OAuth2PasswordBearer( 1abcdefg
65 tokenUrl="token",
66 scopes={"me": "Read information about the current user.", "items": "Read items."},
67)
69app = FastAPI() 1abcdefg
72def verify_password(plain_password, hashed_password): 1abcdefg
73 return password_hash.verify(plain_password, hashed_password) 2Q # h C i v D jbR $ j E k w F kbS % l G m x H lbT ' n I o y J mbU ( p K q z L nbV ) r M s A N obW * t O u B P pb
76def get_password_hash(password): 1abcdefg
77 return password_hash.hash(password) 2qbrbsbtbubvbwb
80def get_user(db, username: str): 1abcdefg
81 if username in db: 1Q#=hCivDXYR$?jEkwFZ0S%@lGmxH12T'[nIoyJ34U(]pKqzL56V)^rMsAN78W*_tOuBP9!
82 user_dict = db[username] 1Q#hCivDR$jEkwFS%lGmxHT'nIoyJU(pKqzLV)rMsANW*tOuBP
83 return UserInDB(**user_dict) 1Q#hCivDR$jEkwFS%lGmxHT'nIoyJU(pKqzLV)rMsANW*tOuBP
86def authenticate_user(fake_db, username: str, password: str): 1abcdefg
87 user = get_user(fake_db, username) 1Q#=hCivDR$?jEkwFS%@lGmxHT'[nIoyJU(]pKqzLV)^rMsANW*_tOuBP
88 if not user: 1Q#=hCivDR$?jEkwFS%@lGmxHT'[nIoyJU(]pKqzLV)^rMsANW*_tOuBP
89 return False 1=?@[]^_
90 if not verify_password(password, user.hashed_password): 1Q#hCivDR$jEkwFS%lGmxHT'nIoyJU(pKqzLV)rMsANW*tOuBP
91 return False 1#$%'()*
92 return user 1QhCivDRjEkwFSlGmxHTnIoyJUpKqzLVrMsANWtOuBP
95def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): 1abcdefg
96 to_encode = data.copy() 2cbQ h C i v D dbR j E k w F ebS l G m x H fbT n I o y J gbU p K q z L hbV r M s A N ibW t O u B P
97 if expires_delta: 2cbQ h C i v D dbR j E k w F ebS l G m x H fbT n I o y J gbU p K q z L hbV r M s A N ibW t O u B P
98 expire = datetime.now(timezone.utc) + expires_delta 1QhCivDRjEkwFSlGmxHTnIoyJUpKqzLVrMsANWtOuBP
99 else:
100 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 2cbdbebfbgbhbib
101 to_encode.update({"exp": expire}) 2cbQ h C i v D dbR j E k w F ebS l G m x H fbT n I o y J gbU p K q z L hbV r M s A N ibW t O u B P
102 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 2cbQ h C i v D dbR j E k w F ebS l G m x H fbT n I o y J gbU p K q z L hbV r M s A N ibW t O u B P
103 return encoded_jwt 2cbQ h C i v D dbR j E k w F ebS l G m x H fbT n I o y J gbU p K q z L hbV r M s A N ibW t O u B P
106async def get_current_user( 1abcdefg
107 security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)]
108):
109 if security_scopes.scopes: 2` h C i v D + X Y { j E k w F , Z 0 | l G m x H - 1 2 } n I o y J . 3 4 ~ p K q z L / 5 6 abr M s A N : 7 8 bbt O u B P ; 9 !
110 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 2` h i v D + X Y { j k w F , Z 0 | l m x H - 1 2 } n o y J . 3 4 ~ p q z L / 5 6 abr s A N : 7 8 bbt u B P ; 9 !
111 else:
112 authenticate_value = "Bearer" 1CEGIKMO
113 credentials_exception = HTTPException( 2` h C i v D + X Y { j E k w F , Z 0 | l G m x H - 1 2 } n I o y J . 3 4 ~ p K q z L / 5 6 abr M s A N : 7 8 bbt O u B P ; 9 !
114 status_code=status.HTTP_401_UNAUTHORIZED,
115 detail="Could not validate credentials",
116 headers={"WWW-Authenticate": authenticate_value},
117 )
118 try: 2` h C i v D + X Y { j E k w F , Z 0 | l G m x H - 1 2 } n I o y J . 3 4 ~ p K q z L / 5 6 abr M s A N : 7 8 bbt O u B P ; 9 !
119 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 2` h C i v D + X Y { j E k w F , Z 0 | l G m x H - 1 2 } n I o y J . 3 4 ~ p K q z L / 5 6 abr M s A N : 7 8 bbt O u B P ; 9 !
120 username = payload.get("sub") 1hCivD+XYjEkwF,Z0lGmxH-12nIoyJ.34pKqzL/56rMsAN:78tOuBP;9!
121 if username is None: 1hCivD+XYjEkwF,Z0lGmxH-12nIoyJ.34pKqzL/56rMsAN:78tOuBP;9!
122 raise credentials_exception 1+,-./:;
123 scope: str = payload.get("scope", "") 1hCivDXYjEkwFZ0lGmxH12nIoyJ34pKqzL56rMsAN78tOuBP9!
124 token_scopes = scope.split(" ") 1hCivDXYjEkwFZ0lGmxH12nIoyJ34pKqzL56rMsAN78tOuBP9!
125 token_data = TokenData(scopes=token_scopes, username=username) 1hCivDXYjEkwFZ0lGmxH12nIoyJ34pKqzL56rMsAN78tOuBP9!
126 except (InvalidTokenError, ValidationError): 2` + { , | - } . ~ / ab: bb;
127 raise credentials_exception 2` { | } ~ abbb
128 user = get_user(fake_users_db, username=token_data.username) 1hCivDXYjEkwFZ0lGmxH12nIoyJ34pKqzL56rMsAN78tOuBP9!
129 if user is None: 1hCivDXYjEkwFZ0lGmxH12nIoyJ34pKqzL56rMsAN78tOuBP9!
130 raise credentials_exception 1XYZ0123456789!
131 for scope in security_scopes.scopes: 1hCivDjEkwFlGmxHnIoyJpKqzLrMsANtOuBP
132 if scope not in token_data.scopes: 1hivDjkwFlmxHnoyJpqzLrsANtuBP
133 raise HTTPException( 1DFHJLNP
134 status_code=status.HTTP_401_UNAUTHORIZED,
135 detail="Not enough permissions",
136 headers={"WWW-Authenticate": authenticate_value},
137 )
138 return user 1hCivjEkwlGmxnIoypKqzrMsAtOuB
141async def get_current_active_user( 1abcdefg
142 current_user: Annotated[User, Security(get_current_user, scopes=["me"])],
143):
144 if current_user.disabled: 1hivjkwlmxnoypqzrsAtuB
145 raise HTTPException(status_code=400, detail="Inactive user") 1vwxyzAB
146 return current_user 1hijklmnopqrstu
149@app.post("/token") 1abcdefg
150async def login_for_access_token( 1abcdefg
151 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
152) -> Token:
153 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1Q#=hCivDR$?jEkwFS%@lGmxHT'[nIoyJU(]pKqzLV)^rMsANW*_tOuBP
154 if not user: 1Q#=hCivDR$?jEkwFS%@lGmxHT'[nIoyJU(]pKqzLV)^rMsANW*_tOuBP
155 raise HTTPException(status_code=400, detail="Incorrect username or password") 1#=$?%@'[(])^*_
156 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1QhCivDRjEkwFSlGmxHTnIoyJUpKqzLVrMsANWtOuBP
157 access_token = create_access_token( 1QhCivDRjEkwFSlGmxHTnIoyJUpKqzLVrMsANWtOuBP
158 data={"sub": user.username, "scope": " ".join(form_data.scopes)},
159 expires_delta=access_token_expires,
160 )
161 return Token(access_token=access_token, token_type="bearer") 1QhCivDRjEkwFSlGmxHTnIoyJUpKqzLVrMsANWtOuBP
164@app.get("/users/me/", response_model=User) 1abcdefg
165async def read_users_me( 1abcdefg
166 current_user: Annotated[User, Depends(get_current_active_user)],
167):
168 return current_user 1ikmoqsu
171@app.get("/users/me/items/") 1abcdefg
172async def read_own_items( 1abcdefg
173 current_user: Annotated[User, Security(get_current_active_user, scopes=["items"])],
174):
175 return [{"item_id": "Foo", "owner": current_user.username}] 1hjlnprt
178@app.get("/status/") 1abcdefg
179async def read_system_status(current_user: Annotated[User, Depends(get_current_user)]): 1abcdefg
180 return {"status": "ok"} 1CEGIKMO