Coverage for docs_src / security / tutorial004_an_py310.py: 100%

84 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-12 18:15 +0000

1from datetime import datetime, timedelta, timezone 1abc

2from typing import Annotated 1abc

3 

4import jwt 1abc

5from fastapi import Depends, FastAPI, HTTPException, status 1abc

6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abc

7from jwt.exceptions import InvalidTokenError 1abc

8from pwdlib import PasswordHash 1abc

9from pydantic import BaseModel 1abc

10 

11# to get a string like this run: 

12# openssl rand -hex 32 

13SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc

14ALGORITHM = "HS256" 1abc

15ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc

16 

17 

18fake_users_db = { 1abc

19 "johndoe": { 

20 "username": "johndoe", 

21 "full_name": "John Doe", 

22 "email": "johndoe@example.com", 

23 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

24 "disabled": False, 

25 } 

26} 

27 

28 

29class Token(BaseModel): 1abc

30 access_token: str 1abc

31 token_type: str 1abc

32 

33 

34class TokenData(BaseModel): 1abc

35 username: str | None = None 1abc

36 

37 

38class User(BaseModel): 1abc

39 username: str 1abc

40 email: str | None = None 1abc

41 full_name: str | None = None 1abc

42 disabled: bool | None = None 1abc

43 

44 

45class UserInDB(User): 1abc

46 hashed_password: str 1abc

47 

48 

49password_hash = PasswordHash.recommended() 1abc

50 

51DUMMY_HASH = password_hash.hash("dummypassword") 1abc

52 

53oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abc

54 

55app = FastAPI() 1abc

56 

57 

58def verify_password(plain_password, hashed_password): 1abc

59 return password_hash.verify(plain_password, hashed_password) 1mpydefKnqzghiLorAjklM

60 

61 

62def get_password_hash(password): 1abc

63 return password_hash.hash(password) 1NfOiPl

64 

65 

66def get_user(db, username: str): 1abc

67 if username in db: 1mpydefstnqzghiuvorAjklwx

68 user_dict = db[username] 1mpdefnqghiorjkl

69 return UserInDB(**user_dict) 1mpdefnqghiorjkl

70 

71 

72def authenticate_user(fake_db, username: str, password: str): 1abc

73 user = get_user(fake_db, username) 1mpydefnqzghiorAjkl

74 if not user: 1mpydefnqzghiorAjkl

75 verify_password(password, DUMMY_HASH) 1yzA

76 return False 1yzA

77 if not verify_password(password, user.hashed_password): 1mpdefnqghiorjkl

78 return False 1pqr

79 return user 1mdefnghiojkl

80 

81 

82def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc

83 to_encode = data.copy() 1EmdefFnghiGojkl

84 if expires_delta: 1EmdefFnghiGojkl

85 expire = datetime.now(timezone.utc) + expires_delta 1mdefnghiojkl

86 else: 

87 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1EFG

88 to_encode.update({"exp": expire}) 1EmdefFnghiGojkl

89 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1EmdefFnghiGojkl

90 return encoded_jwt 1EmdefFnghiGojkl

91 

92 

93async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): 1abc

94 credentials_exception = HTTPException( 1HdefBstIghiCuvJjklDwx

95 status_code=status.HTTP_401_UNAUTHORIZED, 

96 detail="Could not validate credentials", 

97 headers={"WWW-Authenticate": "Bearer"}, 

98 ) 

99 try: 1HdefBstIghiCuvJjklDwx

100 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1HdefBstIghiCuvJjklDwx

101 username = payload.get("sub") 1defBstghiCuvjklDwx

102 if username is None: 1defBstghiCuvjklDwx

103 raise credentials_exception 1BCD

104 token_data = TokenData(username=username) 1defstghiuvjklwx

105 except InvalidTokenError: 1HBICJD

106 raise credentials_exception 1HIJ

107 user = get_user(fake_users_db, username=token_data.username) 1defstghiuvjklwx

108 if user is None: 1defstghiuvjklwx

109 raise credentials_exception 1stuvwx

110 return user 1defghijkl

111 

112 

113async def get_current_active_user( 1abc

114 current_user: Annotated[User, Depends(get_current_user)], 

115): 

116 if current_user.disabled: 1defghijkl

117 raise HTTPException(status_code=400, detail="Inactive user") 1fil

118 return current_user 1deghjk

119 

120 

121@app.post("/token") 1abc

122async def login_for_access_token( 1abc

123 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

124) -> Token: 

125 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1mpydefnqzghiorAjkl

126 if not user: 1mpydefnqzghiorAjkl

127 raise HTTPException( 1pyqzrA

128 status_code=status.HTTP_401_UNAUTHORIZED, 

129 detail="Incorrect username or password", 

130 headers={"WWW-Authenticate": "Bearer"}, 

131 ) 

132 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1mdefnghiojkl

133 access_token = create_access_token( 1mdefnghiojkl

134 data={"sub": user.username}, expires_delta=access_token_expires 

135 ) 

136 return Token(access_token=access_token, token_type="bearer") 1mdefnghiojkl

137 

138 

139@app.get("/users/me/") 1abc

140async def read_users_me( 1abc

141 current_user: Annotated[User, Depends(get_current_active_user)], 

142) -> User: 

143 return current_user 1ehk

144 

145 

146@app.get("/users/me/items/") 1abc

147async def read_own_items( 1abc

148 current_user: Annotated[User, Depends(get_current_active_user)], 

149): 

150 return [{"item_id": "Foo", "owner": current_user.username}] 1dgj