Coverage for docs_src/security/tutorial005.py: 100%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 08:24 +0000

1from datetime import datetime, timedelta, timezone 1abcdef

2from typing import List, Union 1abcdef

3 

4import jwt 1abcdef

5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcdef

6from fastapi.security import ( 1abcdef

7 OAuth2PasswordBearer, 

8 OAuth2PasswordRequestForm, 

9 SecurityScopes, 

10) 

11from jwt.exceptions import InvalidTokenError 1abcdef

12from passlib.context import CryptContext 1abcdef

13from pydantic import BaseModel, ValidationError 1abcdef

14 

15# to get a string like this run: 

16# openssl rand -hex 32 

17SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcdef

18ALGORITHM = "HS256" 1abcdef

19ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcdef

20 

21 

22fake_users_db = { 1abcdef

23 "johndoe": { 

24 "username": "johndoe", 

25 "full_name": "John Doe", 

26 "email": "johndoe@example.com", 

27 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", 

28 "disabled": False, 

29 }, 

30 "alice": { 

31 "username": "alice", 

32 "full_name": "Alice Chains", 

33 "email": "alicechains@example.com", 

34 "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm", 

35 "disabled": True, 

36 }, 

37} 

38 

39 

40class Token(BaseModel): 1abcdef

41 access_token: str 1abcdef

42 token_type: str 1abcdef

43 

44 

45class TokenData(BaseModel): 1abcdef

46 username: Union[str, None] = None 1abcdef

47 scopes: List[str] = [] 1abcdef

48 

49 

50class User(BaseModel): 1abcdef

51 username: str 1abcdef

52 email: Union[str, None] = None 1abcdef

53 full_name: Union[str, None] = None 1abcdef

54 disabled: Union[bool, None] = None 1abcdef

55 

56 

57class UserInDB(User): 1abcdef

58 hashed_password: str 1abcdef

59 

60 

61pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1abcdef

62 

63oauth2_scheme = OAuth2PasswordBearer( 1abcdef

64 tokenUrl="token", 

65 scopes={"me": "Read information about the current user.", "items": "Read items."}, 

66) 

67 

68app = FastAPI() 1abcdef

69 

70 

71def verify_password(plain_password, hashed_password): 1abcdef

72 return pwd_context.verify(plain_password, hashed_password) 2K 2 g y h s z ` L 3 i A j t B { M 4 k C l u D | N 5 m E n v F } O 6 o G p w H ~ P 7 q I r x J ab

73 

74 

75def get_password_hash(password): 1abcdef

76 return pwd_context.hash(password) 2bbcbdbebfbgb

77 

78 

79def get_user(db, username: str): 1abcdef

80 if username in db: 1K2'gyhszQRL3(iAjtBSTM4)kCluDUVN5*mEnvFWXO6+oGpwHYZP7,qIrxJ01

81 user_dict = db[username] 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ

82 return UserInDB(**user_dict) 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ

83 

84 

85def authenticate_user(fake_db, username: str, password: str): 1abcdef

86 user = get_user(fake_db, username) 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ

87 if not user: 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ

88 return False 1'()*+,

89 if not verify_password(password, user.hashed_password): 1K2gyhszL3iAjtBM4kCluDN5mEnvFO6oGpwHP7qIrxJ

90 return False 1234567

91 return user 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ

92 

93 

94def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): 1abcdef

95 to_encode = data.copy() 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ

96 if expires_delta: 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ

97 expire = datetime.now(timezone.utc) + expires_delta 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ

98 else: 

99 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1?@[]^_

100 to_encode.update({"exp": expire}) 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ

101 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ

102 return encoded_jwt 1?Kgyhsz@LiAjtB[MkCluD]NmEnvF^OoGpwH_PqIrxJ

103 

104 

105async def get_current_user( 1abcdef

106 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) 

107): 

108 if security_scopes.scopes: 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01

109 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1-ghsz8QR.ijtB9ST/kluD!UV:mnvF#WX;opwH$YZ=qrxJ%01

110 else: 

111 authenticate_value = "Bearer" 1yACEGI

112 credentials_exception = HTTPException( 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01

113 status_code=status.HTTP_401_UNAUTHORIZED, 

114 detail="Could not validate credentials", 

115 headers={"WWW-Authenticate": authenticate_value}, 

116 ) 

117 try: 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01

118 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1-gyhsz8QR.iAjtB9ST/kCluD!UV:mEnvF#WX;oGpwH$YZ=qIrxJ%01

119 username: str = payload.get("sub") 1gyhsz8QRiAjtB9STkCluD!UVmEnvF#WXoGpwH$YZqIrxJ%01

120 if username is None: 1gyhsz8QRiAjtB9STkCluD!UVmEnvF#WXoGpwH$YZqIrxJ%01

121 raise credentials_exception 189!#$%

122 token_scopes = payload.get("scopes", []) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01

123 token_data = TokenData(scopes=token_scopes, username=username) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01

124 except (InvalidTokenError, ValidationError): 1-8.9/!:#;$=%

125 raise credentials_exception 1-./:;=

126 user = get_user(fake_users_db, username=token_data.username) 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01

127 if user is None: 1gyhszQRiAjtBSTkCluDUVmEnvFWXoGpwHYZqIrxJ01

128 raise credentials_exception 1QRSTUVWXYZ01

129 for scope in security_scopes.scopes: 1gyhsziAjtBkCluDmEnvFoGpwHqIrxJ

130 if scope not in token_data.scopes: 1ghszijtBkluDmnvFopwHqrxJ

131 raise HTTPException( 1zBDFHJ

132 status_code=status.HTTP_401_UNAUTHORIZED, 

133 detail="Not enough permissions", 

134 headers={"WWW-Authenticate": authenticate_value}, 

135 ) 

136 return user 1gyhsiAjtkClumEnvoGpwqIrx

137 

138 

139async def get_current_active_user( 1abcdef

140 current_user: User = Security(get_current_user, scopes=["me"]), 

141): 

142 if current_user.disabled: 1ghsijtklumnvopwqrx

143 raise HTTPException(status_code=400, detail="Inactive user") 1stuvwx

144 return current_user 1ghijklmnopqr

145 

146 

147@app.post("/token") 1abcdef

148async def login_for_access_token( 1abcdef

149 form_data: OAuth2PasswordRequestForm = Depends(), 

150) -> Token: 

151 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ

152 if not user: 1K2'gyhszL3(iAjtBM4)kCluDN5*mEnvFO6+oGpwHP7,qIrxJ

153 raise HTTPException(status_code=400, detail="Incorrect username or password") 12'3(4)5*6+7,

154 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ

155 access_token = create_access_token( 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ

156 data={"sub": user.username, "scopes": form_data.scopes}, 

157 expires_delta=access_token_expires, 

158 ) 

159 return Token(access_token=access_token, token_type="bearer") 1KgyhszLiAjtBMkCluDNmEnvFOoGpwHPqIrxJ

160 

161 

162@app.get("/users/me/", response_model=User) 1abcdef

163async def read_users_me(current_user: User = Depends(get_current_active_user)): 1abcdef

164 return current_user 1hjlnpr

165 

166 

167@app.get("/users/me/items/") 1abcdef

168async def read_own_items( 1abcdef

169 current_user: User = Security(get_current_active_user, scopes=["items"]), 

170): 

171 return [{"item_id": "Foo", "owner": current_user.username}] 1gikmoq

172 

173 

174@app.get("/status/") 1abcdef

175async def read_system_status(current_user: User = Depends(get_current_user)): 1abcdef

176 return {"status": "ok"} 1yACEGI