from datetime import timedelta from typing import Annotated, Any from app.api.deps import CurrentUser, SessionDep, get_current_active_superuser from app.common import security from app.common.security import get_password_hash from app.config import settings from app.models import crud from app.models.models import Message, NewPassword, Token from app.models.user import UserOut from app.utils.mail_util import (generate_password_reset_token, generate_reset_password_email, send_email, verify_password_reset_token) from fastapi import APIRouter, Depends, HTTPException from fastapi.responses import HTMLResponse from fastapi.security import OAuth2PasswordRequestForm router = APIRouter() @router.post("/login/access-token") def login_access_token( session: SessionDep, form_data: Annotated[OAuth2PasswordRequestForm, Depends()] ) -> Token: """ OAuth2 compatible token login, get an access token for future requests """ user = crud.authenticate( session=session, email=form_data.username, password=form_data.password ) if not user: raise HTTPException(status_code=400, detail="Incorrect email or password") elif not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) return Token( access_token=security.create_access_token( user.id, expires_delta=access_token_expires ) ) @router.post("/login/test-token", response_model=UserOut) def test_token(current_user: CurrentUser) -> Any: """ Test access token """ return current_user @router.post("/password-recovery/{email}") def recover_password(email: str, session: SessionDep) -> Message: """ Password Recovery """ user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="The user with this email does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) send_email( email_to=user.email, subject=email_data.subject, html_content=email_data.html_content, ) return Message(message="Password recovery email sent") @router.post("/reset-password/") def reset_password(session: SessionDep, body: NewPassword) -> Message: """ Reset password """ # STRONG_PASSWORD_PATTERN = re.compile(r"^(?=.*[\d])(?=.*[!@#$%^&*])[\w!@#$%^&*]{6,128}$") email = verify_password_reset_token(token=body.token) if not email: raise HTTPException(status_code=400, detail="Invalid token") user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="The user with this email does not exist in the system.", ) elif not user.is_active: raise HTTPException(status_code=400, detail="Inactive user") hashed_password = get_password_hash(password=body.new_password) user.hashed_password = hashed_password session.add(user) session.commit() return Message(message="Password updated successfully") @router.post( "/password-recovery-html-content/{email}", dependencies=[Depends(get_current_active_superuser)], response_class=HTMLResponse, ) def recover_password_html_content(email: str, session: SessionDep) -> Any: """ HTML Content for Password Recovery """ user = crud.get_user_by_email(session=session, email=email) if not user: raise HTTPException( status_code=404, detail="The user with this username does not exist in the system.", ) password_reset_token = generate_password_reset_token(email=email) email_data = generate_reset_password_email( email_to=user.email, email=email, token=password_reset_token ) return HTMLResponse( content=email_data.html_content, headers={"subject:": email_data.subject} ) @router.get("/healthcheck", include_in_schema=False) async def healthcheck() -> dict[str, str]: return {"status": "ok"} # @router.post("/signup", response_model=UserOut) # async def signup(user: UserOut = Body(...)): # admin_exists = await User.find_one(User.email ==user.email) # if admin_exists: # raise HTTPException( # status_code=409, detail = "user is exists." # ) # admin.password = # new_admin = await add_user(admin) # return new_admin