Coverage for docs/docs_src/user_guide/adapters/fastapi/security/main_1_jwt.py: 83%
60 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-19 12:16 +0000
1from datetime import datetime, timedelta, timezone 1aefghibcd
2from typing import Annotated, Any, Optional, Union 1aefghibcd
4import jwt 1aefghibcd
5from fastapi import Depends, FastAPI, HTTPException, status 1aefghibcd
6from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm 1aefghibcd
7from jwt.exceptions import InvalidTokenError 1aefghibcd
8from passlib.context import CryptContext 1aefghibcd
9from pydantic import BaseModel 1aefghibcd
11from fastagency.adapters.fastapi import FastAPIAdapter 1aefghibcd
13from .workflows import wf 1aefghibcd
15app = FastAPI(title="FastAPI with FastAgency") 1aefghibcd
17################################################################################
18#
19# Taken from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
20#
22# to get a string like this run:
23# openssl rand -hex 32
24SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # pragma: allowlist secret 1aefghibcd
25ALGORITHM = "HS256" 1aefghibcd
26ACCESS_TOKEN_EXPIRE_MINUTES = 30 1aefghibcd
29fake_users_db = { 1aefghibcd
30 "johndoe": {
31 "username": "johndoe",
32 "full_name": "John Doe",
33 "email": "johndoe@example.com",
34 "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", # nosemgrep
35 "disabled": False,
36 }
37}
40pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") 1aefghibcd
42oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") 1aefghibcd
45class User(BaseModel): 1aefghibcd
46 username: str 1aefghibcd
47 email: Union[str, None] = None 1aefghibcd
48 full_name: Union[str, None] = None 1aefghibcd
49 disabled: Union[bool, None] = None 1aefghibcd
52class UserInDB(User): 1aefghibcd
53 hashed_password: str 1aefghibcd
56def verify_password(plain_password: str, hashed_password: str) -> bool: 1aefghibcd
57 return pwd_context.verify(plain_password, hashed_password) # type: ignore 1abcd
60def get_user(db: dict[str, Any], username: str) -> Optional[UserInDB]: 1aefghibcd
61 if username in db: 61 ↛ 64line 61 didn't jump to line 64 because the condition on line 61 was always true1abcd
62 user_dict = db[username] 1abcd
63 return UserInDB(**user_dict) 1abcd
64 return None
67def authenticate_user( 1aefghibcd
68 fake_db: dict[str, Any], username: str, password: str 1aefghibcd
69) -> Union[bool, UserInDB]: 1aefghibcd
70 user = get_user(fake_db, username) 1abcd
71 if not user: 1abcd
72 return False
73 if not verify_password(password, user.hashed_password): 1abcd
74 return False
75 return user 1abcd
78async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> UserInDB: 1aefghibcd
79 credentials_exception = HTTPException( 1abcd
80 status_code=status.HTTP_401_UNAUTHORIZED,
81 detail="Could not validate credentials",
82 headers={"WWW-Authenticate": "Bearer"},
83 )
84 try: 1abcd
85 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) 1abcd
86 username: str = payload.get("sub") 1abcd
87 if username is None: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true1abcd
88 raise credentials_exception
89 token_data = TokenData(username=username) 1abcd
90 except InvalidTokenError:
91 raise credentials_exception from None
92 user = get_user(fake_users_db, username=token_data.username) # type: ignore 1abcd
93 if user is None: 93 ↛ 94line 93 didn't jump to line 94 because the condition on line 93 was never true1abcd
94 raise credentials_exception
95 return user 1abcd
98async def get_current_active_user( 1aefghibcd
99 current_user: Annotated[User, Depends(get_current_user)],
100) -> User:
101 if current_user.disabled: 101 ↛ 102line 101 didn't jump to line 102 because the condition on line 101 was never true1abcd
102 raise HTTPException(status_code=400, detail="Inactive user")
103 return current_user 1abcd
106class Token(BaseModel): 1aefghibcd
107 access_token: str 1aefghibcd
108 token_type: str 1aefghibcd
111class TokenData(BaseModel): 1aefghibcd
112 username: Union[str, None] = None 1aefghibcd
115def create_access_token( 1aefghibcd
116 data: dict[str, Any], expires_delta: Union[timedelta, None] = None
117) -> str:
118 to_encode = data.copy() 1abcd
119 if expires_delta: 119 ↛ 122line 119 didn't jump to line 122 because the condition on line 119 was always true1abcd
120 expire = datetime.now(timezone.utc) + expires_delta 1abcd
121 else:
122 expire = datetime.now(timezone.utc) + timedelta(minutes=15)
123 to_encode.update({"exp": expire}) 1abcd
124 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # nosemgrep 1abcd
125 return encoded_jwt 1abcd
128@app.post("/token") 1aefghibcd
129async def login_for_access_token( 1aefghibcd
130 form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
131) -> Token:
132 user = authenticate_user(fake_users_db, form_data.username, form_data.password) 1abcd
133 if not user: 1abcd
134 raise HTTPException(
135 status_code=status.HTTP_401_UNAUTHORIZED,
136 detail="Incorrect username or password",
137 headers={"WWW-Authenticate": "Bearer"},
138 )
139 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) 1abcd
140 access_token = create_access_token( 1abcd
141 data={"sub": user.username}, # type: ignore
142 expires_delta=access_token_expires,
143 )
144 return Token(access_token=access_token, token_type="bearer") 1abcd
147#
148# End of code from https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
149#
150################################################################################
153def get_user_id( 1aefghibcd
154 current_user: Annotated[User, Depends(get_current_active_user)],
155) -> Optional[str]:
156 return current_user.username 1abcd
159adapter = FastAPIAdapter(provider=wf, get_user_id=get_user_id) 1aefghibcd
160app.include_router(adapter.router) 1aefghibcd
163# this is optional, but we would like to see the list of available workflows
164@app.get("/") 1aefghibcd
165def read_root() -> dict[str, dict[str, str]]: 1aefghibcd
166 return {"Workflows": {name: wf.get_description(name) for name in wf.names}}