users.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from typing import Any
  2. from app.api.deps import CurrentUser, SessionDep, get_current_active_superuser
  3. from app.common.security import get_password_hash, verify_password
  4. from app.config import settings
  5. from app.models import crud
  6. from app.models.item import Item
  7. from app.models.models import Message
  8. from app.models.user import (UpdatePassword, User, UserCreate, UserCreateOpen,
  9. UserOut, UsersOut, UserUpdate, UserUpdateMe)
  10. from app.utils.mail_util import generate_new_account_email, send_email
  11. from sqlmodel import col, delete, func, select
  12. from fastapi import APIRouter, Depends, HTTPException
  13. router = APIRouter()
  14. @router.get(
  15. "/", dependencies=[Depends(get_current_active_superuser)], response_model=UsersOut
  16. )
  17. def read_users(session: SessionDep, skip: int = 0, limit: int = 100) -> Any:
  18. """
  19. Retrieve users.
  20. """
  21. count_statement = select(func.count()).select_from(User)
  22. count = session.exec(count_statement).one()
  23. statement = select(User).offset(skip).limit(limit)
  24. users = session.exec(statement).all()
  25. return UsersOut(data=users, count=count)
  26. @router.post(
  27. "/", dependencies=[Depends(get_current_active_superuser)], response_model=UserOut
  28. )
  29. def create_user(*, session: SessionDep, user_in: UserCreate) -> Any:
  30. """
  31. Create new user.
  32. """
  33. user = crud.get_user_by_email(session=session, email=user_in.email)
  34. if user:
  35. raise HTTPException(
  36. status_code=400,
  37. detail="The user with this email already exists in the system.",
  38. )
  39. user = crud.create_user(session=session, user_create=user_in)
  40. if settings.emails_enabled and user_in.email:
  41. email_data = generate_new_account_email(
  42. email_to=user_in.email, username=user_in.email, password=user_in.password
  43. )
  44. send_email(
  45. email_to=user_in.email,
  46. subject=email_data.subject,
  47. html_content=email_data.html_content,
  48. )
  49. return user
  50. @router.patch("/me", response_model=UserOut)
  51. def update_user_me(
  52. *, session: SessionDep, user_in: UserUpdateMe, current_user: CurrentUser
  53. ) -> Any:
  54. """
  55. Update own user.
  56. """
  57. if user_in.email:
  58. existing_user = crud.get_user_by_email(session=session, email=user_in.email)
  59. if existing_user and existing_user.id != current_user.id:
  60. raise HTTPException(
  61. status_code=409, detail="User with this email already exists"
  62. )
  63. user_data = user_in.model_dump(exclude_unset=True)
  64. current_user.sqlmodel_update(user_data)
  65. session.add(current_user)
  66. session.commit()
  67. session.refresh(current_user)
  68. return current_user
  69. @router.patch("/me/password", response_model=Message)
  70. def update_password_me(
  71. *, session: SessionDep, body: UpdatePassword, current_user: CurrentUser
  72. ) -> Any:
  73. """
  74. Update own password.
  75. """
  76. if not verify_password(body.current_password, current_user.hashed_password):
  77. raise HTTPException(status_code=400, detail="Incorrect password")
  78. if body.current_password == body.new_password:
  79. raise HTTPException(
  80. status_code=400, detail="New password cannot be the same as the current one"
  81. )
  82. hashed_password = get_password_hash(body.new_password)
  83. current_user.hashed_password = hashed_password
  84. session.add(current_user)
  85. session.commit()
  86. return Message(message="Password updated successfully")
  87. @router.get("/me", response_model=UserOut)
  88. def read_user_me(session: SessionDep, current_user: CurrentUser) -> Any:
  89. """
  90. Get current user.
  91. """
  92. return current_user
  93. @router.post("/open", response_model=UserOut)
  94. def create_user_open(session: SessionDep, user_in: UserCreateOpen) -> Any:
  95. """
  96. Create new user without the need to be logged in.
  97. """
  98. if not settings.USERS_OPEN_REGISTRATION:
  99. raise HTTPException(
  100. status_code=403,
  101. detail="Open user registration is forbidden on this server",
  102. )
  103. user = crud.get_user_by_email(session=session, email=user_in.email)
  104. if user:
  105. raise HTTPException(
  106. status_code=400,
  107. detail="The user with this email already exists in the system",
  108. )
  109. user_create = UserCreate.from_orm(user_in)
  110. user = crud.create_user(session=session, user_create=user_create)
  111. return user
  112. @router.get("/{user_id}", response_model=UserOut)
  113. def read_user_by_id(
  114. user_id: int, session: SessionDep, current_user: CurrentUser
  115. ) -> Any:
  116. """
  117. Get a specific user by id.
  118. """
  119. user = session.get(User, user_id)
  120. if user == current_user:
  121. return user
  122. if not current_user.is_superuser:
  123. raise HTTPException(
  124. status_code=403,
  125. detail="The user doesn't have enough privileges",
  126. )
  127. return user
  128. @router.patch(
  129. "/{user_id}",
  130. dependencies=[Depends(get_current_active_superuser)],
  131. response_model=UserOut,
  132. )
  133. def update_user(
  134. *,
  135. session: SessionDep,
  136. user_id: int,
  137. user_in: UserUpdate,
  138. ) -> Any:
  139. """
  140. Update a user.
  141. """
  142. db_user = session.get(User, user_id)
  143. if not db_user:
  144. raise HTTPException(
  145. status_code=404,
  146. detail="The user with this id does not exist in the system",
  147. )
  148. if user_in.email:
  149. existing_user = crud.get_user_by_email(session=session, email=user_in.email)
  150. if existing_user and existing_user.id != user_id:
  151. raise HTTPException(
  152. status_code=409, detail="User with this email already exists"
  153. )
  154. db_user = crud.update_user(session=session, db_user=db_user, user_in=user_in)
  155. return db_user
  156. @router.delete("/{user_id}")
  157. def delete_user(
  158. session: SessionDep, current_user: CurrentUser, user_id: int
  159. ) -> Message:
  160. """
  161. Delete a user.
  162. """
  163. user = session.get(User, user_id)
  164. if not user:
  165. raise HTTPException(status_code=404, detail="User not found")
  166. elif user != current_user and not current_user.is_superuser:
  167. raise HTTPException(
  168. status_code=403, detail="The user doesn't have enough privileges"
  169. )
  170. elif user == current_user and current_user.is_superuser:
  171. raise HTTPException(
  172. status_code=403, detail="Super users are not allowed to delete themselves"
  173. )
  174. statement = delete(Item).where(col(Item.owner_id) == user_id)
  175. session.exec(statement) # type: ignore
  176. session.delete(user)
  177. session.commit()
  178. return Message(message="User deleted successfully")