crud.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from typing import Any
  2. from apps.common.security import get_password_hash, verify_password
  3. from apps.models.item import Item, ItemCreate
  4. from apps.models.user import User, UserCreate, UserUpdate
  5. from sqlmodel import Session, select
  6. def create_user(*, session: Session, user_create: UserCreate) -> User:
  7. db_obj = User.model_validate(
  8. user_create, update={"hashed_password": get_password_hash(user_create.password)}
  9. )
  10. session.add(db_obj)
  11. session.commit()
  12. session.refresh(db_obj)
  13. return db_obj
  14. def update_user(*, session: Session, db_user: User, user_in: UserUpdate) -> Any:
  15. user_data = user_in.model_dump(exclude_unset=True)
  16. extra_data = {}
  17. if "password" in user_data:
  18. password = user_data["password"]
  19. hashed_password = get_password_hash(password)
  20. extra_data["hashed_password"] = hashed_password
  21. db_user.sqlmodel_update(user_data, update=extra_data)
  22. session.add(db_user)
  23. session.commit()
  24. session.refresh(db_user)
  25. return db_user
  26. def get_user_by_email(*, session: Session, email: str) -> User | None:
  27. statement = select(User).where(User.email == email)
  28. session_user = session.exec(statement).first()
  29. return session_user
  30. def authenticate(*, session: Session, email: str, password: str) -> User | None:
  31. db_user = get_user_by_email(session=session, email=email)
  32. if not db_user:
  33. return None
  34. if not verify_password(password, db_user.hashed_password):
  35. return None
  36. return db_user
  37. def create_item(*, session: Session, item_in: ItemCreate, owner_id: int) -> Item:
  38. db_item = Item.model_validate(item_in, update={"owner_id": owner_id})
  39. session.add(db_item)
  40. session.commit()
  41. session.refresh(db_item)
  42. return db_item