Coverage for docs_src / security / tutorial004_py310.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-12 18:15 +0000

1from datetime import datetime, timedelta, timezone 1abc

2 

3import jwt 1abc

4from fastapi import Depends, FastAPI, HTTPException, status 1abc

5from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1abc

6from jwt.exceptions import InvalidTokenError 1abc

7from pwdlib import PasswordHash 1abc

8from pydantic import BaseModel 1abc

9 

10# to get a string like this run: 

11# openssl rand -hex 32 

12SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" 1abc

13ALGORITHM = "HS256" 1abc

14ACCESS_TOKEN_EXPIRE_MINUTES = 30 1abc

15 

16 

17fake_users_db = { 1abc

18 "johndoe": { 

19 "username": "johndoe", 

20 "full_name": "John Doe", 

21 "email": "johndoe@example.com", 

22 "hashed_password": "$argon2id$v=19$m=65536,t=3,p=4$wagCPXjifgvUFBzq4hqe3w$CYaIb8sB+wtD+Vu/P4uod1+Qof8h+1g7bbDlBID48Rc", 

23 "disabled": False, 

24 } 

25} 

26 

27 

28class Token(BaseModel): 1abc

29 access_token: str 1abc

30 token_type: str 1abc

31 

32 

33class TokenData(BaseModel): 1abc

34 username: str | None = None 1abc

35 

36 

37class User(BaseModel): 1abc

38 username: str 1abc

39 email: str | None = None 1abc

40 full_name: str | None = None 1abc

41 disabled: bool | None = None 1abc

42 

43 

44class UserInDB(User): 1abc

45 hashed_password: str 1abc

46 

47 

48password_hash = PasswordHash.recommended() 1abc

49 

50DUMMY_HASH = password_hash.hash("dummypassword") 1abc

51 

52oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1abc

53 

54app = FastAPI() 1abc

55 

56 

57def verify_password(plain_password, hashed_password): 1abc

58 return password_hash.verify(plain_password, hashed_password) 1mpydefKnqzghiLorAjklM

59 

60 

61def get_password_hash(password): 1abc

62 return password_hash.hash(password) 1NfOiPl

63 

64 

65def get_user(db, username: str): 1abc

66 if username in db: 1mpydefstnqzghiuvorAjklwx

67 user_dict = db[username] 1mpdefnqghiorjkl

68 return UserInDB(**user_dict) 1mpdefnqghiorjkl

69 

70 

71def authenticate_user(fake_db, username: str, password: str): 1abc

72 user = get_user(fake_db, username) 1mpydefnqzghiorAjkl

73 if not user: 1mpydefnqzghiorAjkl

74 verify_password(password, DUMMY_HASH) 1yzA

75 return False 1yzA

76 if not verify_password(password, user.hashed_password): 1mpdefnqghiorjkl

77 return False 1pqr

78 return user 1mdefnghiojkl

79 

80 

81def create_access_token(data: dict, expires_delta: timedelta | None = None): 1abc

82 to_encode = data.copy() 1EmdefFnghiGojkl

83 if expires_delta: 1EmdefFnghiGojkl

84 expire = datetime.now(timezone.utc) + expires_delta 1mdefnghiojkl

85 else: 

86 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 1EFG

87 to_encode.update({"exp": expire}) 1EmdefFnghiGojkl

88 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 1EmdefFnghiGojkl

89 return encoded_jwt 1EmdefFnghiGojkl

90 

91 

92async def get_current_user(token: str = Depends(oauth2_scheme)): 1abc

93 credentials_exception = HTTPException( 1HdefBstIghiCuvJjklDwx

94 status_code=status.HTTP_401_UNAUTHORIZED, 

95 detail="Could not validate credentials", 

96 headers={"WWW-Authenticate": "Bearer"}, 

97 ) 

98 try: 1HdefBstIghiCuvJjklDwx

99 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1HdefBstIghiCuvJjklDwx

100 username = payload.get("sub") 1defBstghiCuvjklDwx

101 if username is None: 1defBstghiCuvjklDwx

102 raise credentials_exception 1BCD

103 token_data = TokenData(username=username) 1defstghiuvjklwx

104 except InvalidTokenError: 1HBICJD

105 raise credentials_exception 1HIJ

106 user = get_user(fake_users_db, username=token_data.username) 1defstghiuvjklwx

107 if user is None: 1defstghiuvjklwx

108 raise credentials_exception 1stuvwx

109 return user 1defghijkl

110 

111 

112async def get_current_active_user(current_user: User = Depends(get_current_user)): 1abc

113 if current_user.disabled: 1defghijkl

114 raise HTTPException(status_code=400, detail="Inactive user") 1fil

115 return current_user 1deghjk

116 

117 

118@app.post("/token") 1abc

119async def login_for_access_token( 1abc

120 form_data: OAuth2PasswordRequestForm = Depends(), 

121) -> Token: 

122 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1mpydefnqzghiorAjkl

123 if not user: 1mpydefnqzghiorAjkl

124 raise HTTPException( 1pyqzrA

125 status_code=status.HTTP_401_UNAUTHORIZED, 

126 detail="Incorrect username or password", 

127 headers={"WWW-Authenticate": "Bearer"}, 

128 ) 

129 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1mdefnghiojkl

130 access_token = create_access_token( 1mdefnghiojkl

131 data={"sub": user.username}, expires_delta=access_token_expires 

132 ) 

133 return Token(access_token=access_token, token_type="bearer") 1mdefnghiojkl

134 

135 

136@app.get("/users/me/") 1abc

137async def read_users_me(current_user: User = Depends(get_current_active_user)) -> User: 1abc

138 return current_user 1ehk

139 

140 

141@app.get("/users/me/items/") 1abc

142async def read_own_items(current_user: User = Depends(get_current_active_user)): 1abc

143 return [{"item_id": "Foo", "owner": current_user.username}] 1dgj