Coverage for docs_src/security/tutorial005_py310.py: 100%

93 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-12-04 08:29 +0000

1from datetime import datetime, timedelta, timezone 1abcde

2 

3import jwt 1abcde

4from fastapi import Depends, FastAPI, HTTPException, Security, status 1abcde

5from fastapi.security import ( 1abcde

6 OAuth2PasswordBearer, 

7 OAuth2PasswordRequestForm, 

8 SecurityScopes, 

9) 

10from jwt.exceptions import InvalidTokenError 1abcde

11from pwdlib import PasswordHash 1abcde

12from pydantic import BaseModel, ValidationError 1abcde

13 

14# to get a string like this run: 

15# openssl rand -hex 32 

16SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abcde

17ALGORITHM = "HS256" 1abcde

18ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abcde

19 

20 

21fake_users_db = { 1abcde

22 "johndoe": { 

23 "username": "johndoe", 

24 "full_name": "John Doe", 

25 "email": "johndoe@example.com", 

26 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

27 "disabled": False, 

28 }, 

29 "alice": { 

30 "username": "alice", 

31 "full_name": "Alice Chains", 

32 "email": "alicechains@example.com", 

33 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE", 

34 "disabled": True, 

35 }, 

36} 

37 

38 

39class Token(BaseModel): 1abcde

40 access_token: str 1abcde

41 token_type: str 1abcde

42 

43 

44class TokenData(BaseModel): 1abcde

45 username: str | None = None 1abcde

46 scopes: list[str] = [] 1abcde

47 

48 

49class User(BaseModel): 1abcde

50 username: str 1abcde

51 email: str | None = None 1abcde

52 full_name: str | None = None 1abcde

53 disabled: bool | None = None 1abcde

54 

55 

56class UserInDB(User): 1abcde

57 hashed_password: str 1abcde

58 

59 

60password_hash = PasswordHash.recommended() 1abcde

61 

62oauth2_scheme = OAuth2PasswordBearer( 1abcde

63 tokenUrl="token", 

64 scopes={"me": "Read information about the current user.", "items": "Read items."}, 

65) 

66 

67app = FastAPI() 1abcde

68 

69 

70def verify_password(plain_password, hashed_password): 1abcde

71 return password_hash.verify(plain_password, hashed_password) 1ETfugpv+FUhwiqx,GVjykrz-HWlAmsB.IXnCotD/

72 

73 

74def get_password_hash(password): 1abcde

75 return password_hash.hash(password) 1:;=?@

76 

77 

78def get_user(db, username: str): 1abcde

79 if username in db: 1ET3fugpvJKFU4hwiqxLMGV5jykrzNOHW6lAmsBPQIX7nCotDRS

80 user_dict = db[username] 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

81 return UserInDB(**user_dict) 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

82 

83 

84def authenticate_user(fake_db, username: str, password: str): 1abcde

85 user = get_user(fake_db, username) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

86 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

87 return False 134567

88 if not verify_password(password, user.hashed_password): 1ETfugpvFUhwiqxGVjykrzHWlAmsBIXnCotD

89 return False 1TUVWX

90 return user 1EfugpvFhwiqxGjykrzHlAmsBInCotD

91 

92 

93def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abcde

94 to_encode = data.copy() 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

95 if expires_delta: 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

96 expire = datetime.now(timezone.utc) + expires_delta 1EfugpvFhwiqxGjykrzHlAmsBInCotD

97 else: 

98 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1%'()*

99 to_encode.update({"exp": expire}) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

100 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

101 return encoded_jwt 1%Efugpv'Fhwiqx(Gjykrz)HlAmsB*InCotD

102 

103 

104async def get_current_user( 1abcde

105 security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) 

106): 

107 if security_scopes.scopes: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

108 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 18fgpvYJK9hiqxZLM!jkrz0NO#lmsB1PQ$notD2RS

109 else: 

110 authenticate_value = "Bearer" 1uwyAC

111 credentials_exception = HTTPException( 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

112 status_code=status.HTTP_401_UNAUTHORIZED, 

113 detail="Could not validate credentials", 

114 headers={"WWW-Authenticate": authenticate_value}, 

115 ) 

116 try: 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

117 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 18fugpvYJK9hwiqxZLM!jykrz0NO#lAmsB1PQ$nCotD2RS

118 username: str = payload.get("sub") 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS

119 if username is None: 1fugpvYJKhwiqxZLMjykrz0NOlAmsB1PQnCotD2RS

120 raise credentials_exception 1YZ012

121 scope: str = payload.get("scope", "") 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

122 token_scopes = scope.split(" ") 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

123 token_data = TokenData(scopes=token_scopes, username=username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

124 except (InvalidTokenError, ValidationError): 18Y9Z!0#1$2

125 raise credentials_exception 189!#$

126 user = get_user(fake_users_db, username=token_data.username) 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

127 if user is None: 1fugpvJKhwiqxLMjykrzNOlAmsBPQnCotDRS

128 raise credentials_exception 1JKLMNOPQRS

129 for scope in security_scopes.scopes: 1fugpvhwiqxjykrzlAmsBnCotD

130 if scope not in token_data.scopes: 1fgpvhiqxjkrzlmsBnotD

131 raise HTTPException( 1vxzBD

132 status_code=status.HTTP_401_UNAUTHORIZED, 

133 detail="Not enough permissions", 

134 headers={"WWW-Authenticate": authenticate_value}, 

135 ) 

136 return user 1fugphwiqjykrlAmsnCot

137 

138 

139async def get_current_active_user( 1abcde

140 current_user: User = Security(get_current_user, scopes=["me"]), 

141): 

142 if current_user.disabled: 1fgphiqjkrlmsnot

143 raise HTTPException(status_code=400, detail="Inactive user") 1pqrst

144 return current_user 1fghijklmno

145 

146 

147@app.post("/token") 1abcde

148async def login_for_access_token( 1abcde

149 form_data: OAuth2PasswordRequestForm = Depends(), 

150) -> Token: 

151 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

152 if not user: 1ET3fugpvFU4hwiqxGV5jykrzHW6lAmsBIX7nCotD

153 raise HTTPException(status_code=400, detail="Incorrect username or password") 1T3U4V5W6X7

154 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1EfugpvFhwiqxGjykrzHlAmsBInCotD

155 access_token = create_access_token( 1EfugpvFhwiqxGjykrzHlAmsBInCotD

156 data={"sub": user.username, "scope": " ".join(form_data.scopes)}, 

157 expires_delta=access_token_expires, 

158 ) 

159 return Token(access_token=access_token, token_type="bearer") 1EfugpvFhwiqxGjykrzHlAmsBInCotD

160 

161 

162@app.get("/users/me/", response_model=User) 1abcde

163async def read_users_me(current_user: User = Depends(get_current_active_user)): 1abcde

164 return current_user 1gikmo

165 

166 

167@app.get("/users/me/items/") 1abcde

168async def read_own_items( 1abcde

169 current_user: User = Security(get_current_active_user, scopes=["items"]), 

170): 

171 return [{"item_id": "Foo", "owner": current_user.username}] 1fhjln

172 

173 

174@app.get("/status/") 1abcde

175async def read_system_status(current_user: User = Depends(get_current_user)): 1abcde

176 return {"status": "ok"} 1uwyAC