Coverage for docs_src/security/tutorial005_an.py: 100%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-01-13 13:38 +0000

1from datetime import datetime, timedelta, timezone 1abcde

2from typing import List, Union 1abcde

3 

4import jwt 1abcde

5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcde

6from fastapi.security import ( 1abcde

7 OAuth2PasswordBearer, 

8 OAuth2PasswordRequestForm, 

9 SecurityScopes, 

10) 

11from jwt.exceptions import InvalidTokenError 1abcde

12from passlib.context import CryptContext 1abcde

13from pydantic import BaseModel, ValidationError 1abcde

14from typing_extensions import Annotated 1abcde

15 

16# to get a string like this run: 

17# openssl rand -hex 32 

18SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcde

19ALGORITHM = "HS256" 1abcde

20ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcde

21 

22 

23fake_users_db = { 1abcde

24 "johndoe": { 

25 "username": "johndoe", 

26 "full_name": "John Doe", 

27 "email": "johndoe@example.com", 

28 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", 

29 "disabled": False, 

30 }, 

31 "alice": { 

32 "username": "alice", 

33 "full_name": "Alice Chains", 

34 "email": "alicechains@example.com", 

35 "hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm", 

36 "disabled": True, 

37 }, 

38} 

39 

40 

41class Token(BaseModel): 1abcde

42 access_token: str 1abcde

43 token_type: str 1abcde

44 

45 

46class TokenData(BaseModel): 1abcde

47 username: Union[str, None] = None 1abcde

48 scopes: List[str] = [] 1abcde

49 

50 

51class User(BaseModel): 1abcde

52 username: str 1abcde

53 email: Union[str, None] = None 1abcde

54 full_name: Union[str, None] = None 1abcde

55 disabled: Union[bool, None] = None 1abcde

56 

57 

58class UserInDB(User): 1abcde

59 hashed_password: str 1abcde

60 

61 

62pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1abcde

63 

64oauth2_scheme = OAuth2PasswordBearer( 1abcde

65 tokenUrl="token", 

66 scopes={"me": "Read information about the current user.", "items": "Read items."}, 

67) 

68 

69app = FastAPI() 1abcde

70 

71 

72def verify_password(plain_password, hashed_password): 1abcde

73 return pwd_context.verify(plain_password, hashed_password) 1ETfugpv+FUhwiqx,GVjykrz-HWlAmsB.IXnCotD/

74 

75 

76def get_password_hash(password): 1abcde

77 return pwd_context.hash(password) 1:;=?@

78 

79 

80def get_user(db, username: str): 1abcde

81 if username in db: 1ET3fugpvJKFU4hwiqxLMGV5jykrzNOHW6lAmsBPQIX7nCotDRS

82 user_dict = db[username] 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

83 return UserInDB(**user_dict) 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

84 

85 

86def authenticate_user(fake_db, username: str, password: str): 1abcde

87 user = get_user(fake_db, username) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

88 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

89 return False 134567

90 if not verify_password(password, user.hashed_password): 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

91 return False 1TUVWX

92 return user 1EfugpvFhwiqxGjykrzHlAmsBInCotD

93 

94 

95def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): 1abcde

96 to_encode = data.copy() 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

97 if expires_delta: 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

98 expire = datetime.now(timezone.utc) + expires_delta 1EfugpvFhwiqxGjykrzHlAmsBInCotD

99 else: 

100 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1%'()*

101 to_encode.update({"exp": expire}) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

102 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

103 return encoded_jwt 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

104 

105 

106async def get_current_user( 1abcde

107 security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)] 

108): 

109 if security_scopes.scopes: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

110 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 18fgpvYJK9hiqxZLM!jkrz0NO#lmsB1PQ$notD2RS

111 else: 

112 authenticate_value = "Bearer" 1uwyAC

113 credentials_exception = HTTPException( 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

114 status_code=status.HTTP_401_UNAUTHORIZED, 

115 detail="Could not validate credentials", 

116 headers={"WWW-Authenticate": authenticate_value}, 

117 ) 

118 try: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

119 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

120 username: str = payload.get("sub") 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS

121 if username is None: 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS

122 raise credentials_exception 1YZ012

123 token_scopes = payload.get("scopes", []) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

124 token_data = TokenData(scopes=token_scopes, username=username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

125 except (InvalidTokenError, ValidationError): 18Y9Z!0#1$2

126 raise credentials_exception 189!#$

127 user = get_user(fake_users_db, username=token_data.username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

128 if user is None: 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

129 raise credentials_exception 1JKLMNOPQRS

130 for scope in security_scopes.scopes: 1fugpvhwiqxjykrzlAmsBnCotD

131 if scope not in token_data.scopes: 1fgpvhiqxjkrzlmsBnotD

132 raise HTTPException( 1vxzBD

133 status_code=status.HTTP_401_UNAUTHORIZED, 

134 detail="Not enough permissions", 

135 headers={"WWW-Authenticate": authenticate_value}, 

136 ) 

137 return user 1fugphwiqjykrlAmsnCot

138 

139 

140async def get_current_active_user( 1abcde

141 current_user: Annotated[User, Security(get_current_user, scopes=["me"])], 

142): 

143 if current_user.disabled: 1fgphiqjkrlmsnot

144 raise HTTPException(status_code=400, detail="Inactive user") 1pqrst

145 return current_user 1fghijklmno

146 

147 

148@app.post("/token") 1abcde

149async def login_for_access_token( 1abcde

150 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

151) -> Token: 

152 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

153 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

154 raise HTTPException(status_code=400, detail="Incorrect username or password") 1T3U4V5W6X7

155 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1EfugpvFhwiqxGjykrzHlAmsBInCotD

156 access_token = create_access_token( 1EfugpvFhwiqxGjykrzHlAmsBInCotD

157 data={"sub": user.username, "scopes": form_data.scopes}, 

158 expires_delta=access_token_expires, 

159 ) 

160 return Token(access_token=access_token, token_type="bearer") 1EfugpvFhwiqxGjykrzHlAmsBInCotD

161 

162 

163@app.get("/users/me/", response_model=User) 1abcde

164async def read_users_me( 1abcde

165 current_user: Annotated[User, Depends(get_current_active_user)], 

166): 

167 return current_user 1gikmo

168 

169 

170@app.get("/users/me/items/") 1abcde

171async def read_own_items( 1abcde

172 current_user: Annotated[User, Security(get_current_active_user, scopes=["items"])], 

173): 

174 return [{"item_id": "Foo", "owner": current_user.username}] 1fhjln

175 

176 

177@app.get("/status/") 1abcde

178async def read_system_status(current_user: Annotated[User, Depends(get_current_user)]): 1abcde

179 return {"status": "ok"} 1uwyAC