Coverage for docs/docs_src/user_guide/adapters/fastapi/security/main_1_jwt.py: 83%

60 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-19 12:16 +0000

1from datetime import datetime, timedelta, timezone 1aefghibcd

2from typing import Annotated, Any, Optional, Union 1aefghibcd

3 

4import jwt 1aefghibcd

5from fastapi import Depends, FastAPI, HTTPException, status 1aefghibcd

6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1aefghibcd

7from jwt.exceptions import InvalidTokenError 1aefghibcd

8from passlib.context import CryptContext 1aefghibcd

9from pydantic import BaseModel 1aefghibcd

10 

11from fastagency.adapters.fastapi import FastAPIAdapter 1aefghibcd

12 

13from .workflows import wf 1aefghibcd

14 

15app = FastAPI(title="FastAPI with FastAgency") 1aefghibcd

16 

17################################################################################ 

18# 

19# Taken from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/ 

20# 

21 

22# to get a string like this run: 

23# openssl rand -hex 32 

24SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # pragma: allowlist secret 1aefghibcd

25ALGORITHM = "HS256" 1aefghibcd

26ACCESS_TOKEN_EXPIRE_MINUTES = 30 1aefghibcd

27 

28 

29fake_users_db = { 1aefghibcd

30 "johndoe": { 

31 "username": "johndoe", 

32 "full_name": "John Doe", 

33 "email": "johndoe@example.com", 

34 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # nosemgrep 

35 "disabled": False, 

36 } 

37} 

38 

39 

40pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1aefghibcd

41 

42oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1aefghibcd

43 

44 

45class User(BaseModel): 1aefghibcd

46 username: str 1aefghibcd

47 email: Union[str, None] = None 1aefghibcd

48 full_name: Union[str, None] = None 1aefghibcd

49 disabled: Union[bool, None] = None 1aefghibcd

50 

51 

52class UserInDB(User): 1aefghibcd

53 hashed_password: str 1aefghibcd

54 

55 

56def verify_password(plain_password: str, hashed_password: str) -> bool: 1aefghibcd

57 return pwd_context.verify(plain_password, hashed_password) # type: ignore 1abcd

58 

59 

60def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]: 1aefghibcd

61 if username in db: 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was always true1abcd

62 user_dict = db[username] 1abcd

63 return UserInDB(**user_dict) 1abcd

64 return None 

65 

66 

67def authenticate_user( 1aefghibcd

68 fake_db: dict[str, Any], username: str, password: str 1aefghibcd

69) -> Union[bool, UserInDB]: 1aefghibcd

70 user = get_user(fake_db, username) 1abcd

71 if not user: 1abcd

72 return False 

73 if not verify_password(password, user.hashed_password): 1abcd

74 return False 

75 return user 1abcd

76 

77 

78async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB: 1aefghibcd

79 credentials_exception = HTTPException( 1abcd

80 status_code=status.HTTP_401_UNAUTHORIZED, 

81 detail="Could not validate credentials", 

82 headers={"WWW-Authenticate": "Bearer"}, 

83 ) 

84 try: 1abcd

85 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1abcd

86 username: str = payload.get("sub") 1abcd

87 if username is None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true1abcd

88 raise credentials_exception 

89 token_data = TokenData(username=username) 1abcd

90 except InvalidTokenError: 

91 raise credentials_exception from None 

92 user = get_user(fake_users_db, username=token_data.username) # type: ignore 1abcd

93 if user is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true1abcd

94 raise credentials_exception 

95 return user 1abcd

96 

97 

98async def get_current_active_user( 1aefghibcd

99 current_user: Annotated[User, Depends(get_current_user)], 

100) -> User: 

101 if current_user.disabled: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true1abcd

102 raise HTTPException(status_code=400, detail="Inactive user") 

103 return current_user 1abcd

104 

105 

106class Token(BaseModel): 1aefghibcd

107 access_token: str 1aefghibcd

108 token_type: str 1aefghibcd

109 

110 

111class TokenData(BaseModel): 1aefghibcd

112 username: Union[str, None] = None 1aefghibcd

113 

114 

115def create_access_token( 1aefghibcd

116 data: dict[str, Any], expires_delta: Union[timedelta, None] = None 

117) -> str: 

118 to_encode = data.copy() 1abcd

119 if expires_delta: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true1abcd

120 expire = datetime.now(timezone.utc) + expires_delta 1abcd

121 else: 

122 expire = datetime.now(timezone.utc) + timedelta(minutes=15) 

123 to_encode.update({"exp": expire}) 1abcd

124 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # nosemgrep 1abcd

125 return encoded_jwt 1abcd

126 

127 

128@app.post("/token") 1aefghibcd

129async def login_for_access_token( 1aefghibcd

130 form_data: Annotated[OAuth2PasswordRequestForm, Depends()], 

131) -> Token: 

132 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1abcd

133 if not user: 1abcd

134 raise HTTPException( 

135 status_code=status.HTTP_401_UNAUTHORIZED, 

136 detail="Incorrect username or password", 

137 headers={"WWW-Authenticate": "Bearer"}, 

138 ) 

139 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1abcd

140 access_token = create_access_token( 1abcd

141 data={"sub": user.username}, # type: ignore 

142 expires_delta=access_token_expires, 

143 ) 

144 return Token(access_token=access_token, token_type="bearer") 1abcd

145 

146 

147# 

148# End of code from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/ 

149# 

150################################################################################ 

151 

152 

153def get_user_id( 1aefghibcd

154 current_user: Annotated[User, Depends(get_current_active_user)], 

155) -> Optional[str]: 

156 return current_user.username 1abcd

157 

158 

159adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id) 1aefghibcd

160app.include_router(adapter.router) 1aefghibcd

161 

162 

163# this is optional, but we would like to see the list of available workflows 

164@app.get("/") 1aefghibcd

165def read_root() -> dict[str, dict[str, str]]: 1aefghibcd

166 return {"Workflows": {name: wf.get_description(name) for name in wf.names}}