working on authorization

parent 6ecbec33
__pycache__/
.env/
.venv/
# This is the file where authentication and authorization is implemented
# This file is imported in main.py and used as middleware
#
from datetime import datetime, timedelta, timezone
from requests import Session
from fastapi import HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import jwt
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
from user.schemas import UserBase
from user.crud import get_user_by_email
#
# This is the secret key used to hash the password
# to get a string like this run:
# openssl rand -hex 32
#
SECRET_KEY = ""
# This is the password hashing algorithm
ALGORITHM = "HS256"
# This is the expiration time of the token in minutes
ACCESS_TOKEN_EXPIRE_MINUTES = 30
#
# DTOs for token and credentials
#
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str
role: str
class UserCredentials(BaseModel):
email: str
password: str
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
# Token creation
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
#
# Authentication
#
def authenticate_user(db: Session, email: str, password: str):
user = get_user_by_email(db, email)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def login_user(db, email: str, password: str):
user = authenticate_user(db, email, password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email, "role": user.role, "id": user.id},
expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
......@@ -91,3 +91,12 @@ def update_loan(user_id: int, book_id: int, start_date: date, loan: loan_schemas
@app.delete("/loan/{user_id}/{book_id}/{start_date}")
def delete_loan(user_id: int, book_id: int, start_date: date, db: Session = Depends(get_db)):
return loan_crud.delete_loan(db, user_id, book_id, start_date)
# Authentication
from auth import login_user, Token
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
return login_user(db, form_data.username, form_data.password)
......@@ -2,7 +2,7 @@ from sqlalchemy.orm import Session
from fastapi import HTTPException
from passlib.context import CryptContext
from .models import UserDB
from .schemas import UserCreate, User
from .schemas import UserCreate
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
......@@ -35,6 +35,10 @@ def get_user(db: Session, user_id: int):
raise HTTPException(status_code=404, detail="User not found")
return db_user
def get_user_by_email(db: Session, email: str):
db_user = db.query(UserDB).filter(UserDB.email == email).first()
return db_user
def update_user(db: Session, user_id: int, user: UserCreate):
db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
if db_user is None:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment