Coverage for docs_src / security / tutorial005_an_py310.py: 100%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-12 18:15 +0000

1from datetime import datetime, timedelta, timezone 1abc

2from typing import Annotated 1abc

3 

4import jwt 1abc

5from fastapi import Depends, FastAPI, HTTPException, Security, status 1abc

6from fastapi.security import ( 1abc

7 OAuth2PasswordBearer, 

8 OAuth2PasswordRequestForm, 

9 SecurityScopes, 

10) 

11from jwt.exceptions import InvalidTokenError 1abc

12from pwdlib import PasswordHash 1abc

13from pydantic import BaseModel, ValidationError 1abc

14 

15# to get a string like this run: 

16# openssl rand -hex 32 

17SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc

18ALGORITHM = "HS256" 1abc

19ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc

20 

21 

22fake_users_db = { 1abc

23 "johndoe": { 

24 "username": "johndoe", 

25 "full_name": "John Doe", 

26 "email": "johndoe@example.com", 

27 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

28 "disabled": False, 

29 }, 

30 "alice": { 

31 "username": "alice", 

32 "full_name": "Alice Chains", 

33 "email": "alicechains@example.com", 

34 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$g2/AV1zwopqUntPKJavBFw$BwpRGDCyUHLvHICnwijyX8ROGoiUPwNKZ7915MeYfCE", 

35 "disabled": True, 

36 }, 

37} 

38 

39 

40class Token(BaseModel): 1abc

41 access_token: str 1abc

42 token_type: str 1abc

43 

44 

45class TokenData(BaseModel): 1abc

46 username: str | None = None 1abc

47 scopes: list[str] = [] 1abc

48 

49 

50class User(BaseModel): 1abc

51 username: str 1abc

52 email: str | None = None 1abc

53 full_name: str | None = None 1abc

54 disabled: bool | None = None 1abc

55 

56 

57class UserInDB(User): 1abc

58 hashed_password: str 1abc

59 

60 

61password_hash = PasswordHash.recommended() 1abc

62 

63DUMMY_HASH = password_hash.hash("dummypassword") 1abc

64 

65oauth2_scheme = OAuth2PasswordBearer( 1abc

66 tokenUrl="token", 

67 scopes={"me": "Read information about the current user.", "items": "Read items."}, 

68) 

69 

70app = FastAPI() 1abc

71 

72 

73def verify_password(plain_password, hashed_password): 1abc

74 return password_hash.verify(plain_password, hashed_password) 1sBEdmejnQtCFfogkpRuDGhqilrS

75 

76 

77def get_password_hash(password): 1abc

78 return password_hash.hash(password) 1TUV

79 

80 

81def get_user(db, username: str): 1abc

82 if username in db: 1sBEdmejnvwtCFfogkpxyuDGhqilrzA

83 user_dict = db[username] 1sBdmejntCfogkpuDhqilr

84 return UserInDB(**user_dict) 1sBdmejntCfogkpuDhqilr

85 

86 

87def authenticate_user(fake_db, username: str, password: str): 1abc

88 user = get_user(fake_db, username) 1sBEdmejntCFfogkpuDGhqilr

89 if not user: 1sBEdmejntCFfogkpuDGhqilr

90 verify_password(password, DUMMY_HASH) 1EFG

91 return False 1EFG

92 if not verify_password(password, user.hashed_password): 1sBdmejntCfogkpuDhqilr

93 return False 1BCD

94 return user 1sdmejntfogkpuhqilr

95 

96 

97def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc

98 to_encode = data.copy() 1NsdmejnOtfogkpPuhqilr

99 if expires_delta: 1NsdmejnOtfogkpPuhqilr

100 expire = datetime.now(timezone.utc) + expires_delta 1sdmejntfogkpuhqilr

101 else: 

102 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1NOP

103 to_encode.update({"exp": expire}) 1NsdmejnOtfogkpPuhqilr

104 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1NsdmejnOtfogkpPuhqilr

105 return encoded_jwt 1NsdmejnOtfogkpPuhqilr

106 

107 

108async def get_current_user( 1abc

109 security_scopes: SecurityScopes, token: Annotated[str, Depends(oauth2_scheme)] 

110): 

111 if security_scopes.scopes: 1KdmejnHvwLfogkpIxyMhqilrJzA

112 authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' 1KdejnHvwLfgkpIxyMhilrJzA

113 else: 

114 authenticate_value = "Bearer" 1moq

115 credentials_exception = HTTPException( 1KdmejnHvwLfogkpIxyMhqilrJzA

116 status_code=status.HTTP_401_UNAUTHORIZED, 

117 detail="Could not validate credentials", 

118 headers={"WWW-Authenticate": authenticate_value}, 

119 ) 

120 try: 1KdmejnHvwLfogkpIxyMhqilrJzA

121 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1KdmejnHvwLfogkpIxyMhqilrJzA

122 username = payload.get("sub") 1dmejnHvwfogkpIxyhqilrJzA

123 if username is None: 1dmejnHvwfogkpIxyhqilrJzA

124 raise credentials_exception 1HIJ

125 scope: str = payload.get("scope", "") 1dmejnvwfogkpxyhqilrzA

126 token_scopes = scope.split(" ") 1dmejnvwfogkpxyhqilrzA

127 token_data = TokenData(scopes=token_scopes, username=username) 1dmejnvwfogkpxyhqilrzA

128 except (InvalidTokenError, ValidationError): 1KHLIMJ

129 raise credentials_exception 1KLM

130 user = get_user(fake_users_db, username=token_data.username) 1dmejnvwfogkpxyhqilrzA

131 if user is None: 1dmejnvwfogkpxyhqilrzA

132 raise credentials_exception 1vwxyzA

133 for scope in security_scopes.scopes: 1dmejnfogkphqilr

134 if scope not in token_data.scopes: 1dejnfgkphilr

135 raise HTTPException( 1npr

136 status_code=status.HTTP_401_UNAUTHORIZED, 

137 detail="Not enough permissions", 

138 headers={"WWW-Authenticate": authenticate_value}, 

139 ) 

140 return user 1dmejfogkhqil

141 

142 

143async def get_current_active_user( 1abc

144 current_user: Annotated[User, Security(get_current_user, scopes=["me"])], 

145): 

146 if current_user.disabled: 1dejfgkhil

147 raise HTTPException(status_code=400, detail="Inactive user") 1jkl

148 return current_user 1defghi

149 

150 

151@app.post("/token") 1abc

152async def login_for_access_token( 1abc

153 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

154) -> Token: 

155 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1sBEdmejntCFfogkpuDGhqilr

156 if not user: 1sBEdmejntCFfogkpuDGhqilr

157 raise HTTPException(status_code=400, detail="Incorrect username or password") 1BECFDG

158 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1sdmejntfogkpuhqilr

159 access_token = create_access_token( 1sdmejntfogkpuhqilr

160 data={"sub": user.username, "scope": " ".join(form_data.scopes)}, 

161 expires_delta=access_token_expires, 

162 ) 

163 return Token(access_token=access_token, token_type="bearer") 1sdmejntfogkpuhqilr

164 

165 

166@app.get("/users/me/") 1abc

167async def read_users_me( 1abc

168 current_user: Annotated[User, Depends(get_current_active_user)], 

169) -> User: 

170 return current_user 1egi

171 

172 

173@app.get("/users/me/items/") 1abc

174async def read_own_items( 1abc

175 current_user: Annotated[User, Security(get_current_active_user, scopes=["items"])], 

176): 

177 return [{"item_id": "Foo", "owner": current_user.username}] 1dfh

178 

179 

180@app.get("/status/") 1abc

181async def read_system_status(current_user: Annotated[User, Depends(get_current_user)]): 1abc

182 return {"status": "ok"} 1moq