123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- from unittest.mock import patch
- from app import crud
- from app.core.config import settings
- from apps.models import UserCreate
- from apps.tests.utils.utils import random_email, random_lower_string
- from sqlmodel import Session
- from fastapi.testclient import TestClient
- def test_get_users_superuser_me(
- client: TestClient, superuser_token_headers: dict[str, str]
- ) -> None:
- r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
- current_user = r.json()
- assert current_user
- assert current_user["is_active"] is True
- assert current_user["is_superuser"]
- assert current_user["email"] == settings.FIRST_SUPERUSER
- def test_get_users_normal_user_me(
- client: TestClient, normal_user_token_headers: dict[str, str]
- ) -> None:
- r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
- current_user = r.json()
- assert current_user
- assert current_user["is_active"] is True
- assert current_user["is_superuser"] is False
- assert current_user["email"] == settings.EMAIL_TEST_USER
- def test_create_user_new_email(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- with patch("app.utils.send_email", return_value=None), patch(
- "app.core.config.settings.SMTP_HOST", "smtp.example.com"
- ), patch("app.core.config.settings.SMTP_USER", "admin@example.com"):
- username = random_email()
- password = random_lower_string()
- data = {"email": username, "password": password}
- r = client.post(
- f"{settings.API_V1_STR}/users/",
- headers=superuser_token_headers,
- json=data,
- )
- assert 200 <= r.status_code < 300
- created_user = r.json()
- user = crud.get_user_by_email(session=db, email=username)
- assert user
- assert user.email == created_user["email"]
- def test_get_existing_user(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- user_id = user.id
- r = client.get(
- f"{settings.API_V1_STR}/users/{user_id}",
- headers=superuser_token_headers,
- )
- assert 200 <= r.status_code < 300
- api_user = r.json()
- existing_user = crud.get_user_by_email(session=db, email=username)
- assert existing_user
- assert existing_user.email == api_user["email"]
- def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- user_id = user.id
- login_data = {
- "username": username,
- "password": password,
- }
- r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
- tokens = r.json()
- a_token = tokens["access_token"]
- headers = {"Authorization": f"Bearer {a_token}"}
- r = client.get(
- f"{settings.API_V1_STR}/users/{user_id}",
- headers=headers,
- )
- assert 200 <= r.status_code < 300
- api_user = r.json()
- existing_user = crud.get_user_by_email(session=db, email=username)
- assert existing_user
- assert existing_user.email == api_user["email"]
- def test_get_existing_user_permissions_error(
- client: TestClient, normal_user_token_headers: dict[str, str], db: Session
- ) -> None:
- r = client.get(
- f"{settings.API_V1_STR}/users/999999",
- headers=normal_user_token_headers,
- )
- assert r.status_code == 403
- assert r.json() == {"detail": "The user doesn't have enough privileges"}
- def test_create_user_existing_username(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- # username = email
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- crud.create_user(session=db, user_create=user_in)
- data = {"email": username, "password": password}
- r = client.post(
- f"{settings.API_V1_STR}/users/",
- headers=superuser_token_headers,
- json=data,
- )
- created_user = r.json()
- assert r.status_code == 400
- assert "_id" not in created_user
- def test_create_user_by_normal_user(
- client: TestClient, normal_user_token_headers: dict[str, str]
- ) -> None:
- username = random_email()
- password = random_lower_string()
- data = {"email": username, "password": password}
- r = client.post(
- f"{settings.API_V1_STR}/users/",
- headers=normal_user_token_headers,
- json=data,
- )
- assert r.status_code == 400
- def test_retrieve_users(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- crud.create_user(session=db, user_create=user_in)
- username2 = random_email()
- password2 = random_lower_string()
- user_in2 = UserCreate(email=username2, password=password2)
- crud.create_user(session=db, user_create=user_in2)
- r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
- all_users = r.json()
- assert len(all_users["data"]) > 1
- assert "count" in all_users
- for item in all_users["data"]:
- assert "email" in item
- def test_update_user_me(
- client: TestClient, normal_user_token_headers: dict[str, str], db: Session
- ) -> None:
- full_name = "Updated Name"
- email = random_email()
- data = {"full_name": full_name, "email": email}
- r = client.patch(
- f"{settings.API_V1_STR}/users/me",
- headers=normal_user_token_headers,
- json=data,
- )
- assert r.status_code == 200
- updated_user = r.json()
- assert updated_user["email"] == email
- assert updated_user["full_name"] == full_name
- def test_update_password_me(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- new_password = random_lower_string()
- data = {
- "current_password": settings.FIRST_SUPERUSER_PASSWORD,
- "new_password": new_password,
- }
- r = client.patch(
- f"{settings.API_V1_STR}/users/me/password",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 200
- updated_user = r.json()
- assert updated_user["message"] == "Password updated successfully"
- # Revert to the old password to keep consistency in test
- old_data = {
- "current_password": new_password,
- "new_password": settings.FIRST_SUPERUSER_PASSWORD,
- }
- r = client.patch(
- f"{settings.API_V1_STR}/users/me/password",
- headers=superuser_token_headers,
- json=old_data,
- )
- assert r.status_code == 200
- def test_update_password_me_incorrect_password(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- new_password = random_lower_string()
- data = {"current_password": new_password, "new_password": new_password}
- r = client.patch(
- f"{settings.API_V1_STR}/users/me/password",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 400
- updated_user = r.json()
- assert updated_user["detail"] == "Incorrect password"
- def test_update_user_me_email_exists(
- client: TestClient, normal_user_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- data = {"email": user.email}
- r = client.patch(
- f"{settings.API_V1_STR}/users/me",
- headers=normal_user_token_headers,
- json=data,
- )
- assert r.status_code == 409
- assert r.json()["detail"] == "User with this email already exists"
- def test_update_password_me_same_password_error(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- data = {
- "current_password": settings.FIRST_SUPERUSER_PASSWORD,
- "new_password": settings.FIRST_SUPERUSER_PASSWORD,
- }
- r = client.patch(
- f"{settings.API_V1_STR}/users/me/password",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 400
- updated_user = r.json()
- assert (
- updated_user["detail"] == "New password cannot be the same as the current one"
- )
- def test_create_user_open(client: TestClient) -> None:
- with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
- username = random_email()
- password = random_lower_string()
- full_name = random_lower_string()
- data = {"email": username, "password": password, "full_name": full_name}
- r = client.post(
- f"{settings.API_V1_STR}/users/open",
- json=data,
- )
- assert r.status_code == 200
- created_user = r.json()
- assert created_user["email"] == username
- assert created_user["full_name"] == full_name
- def test_create_user_open_forbidden_error(client: TestClient) -> None:
- with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", False):
- username = random_email()
- password = random_lower_string()
- full_name = random_lower_string()
- data = {"email": username, "password": password, "full_name": full_name}
- r = client.post(
- f"{settings.API_V1_STR}/users/open",
- json=data,
- )
- assert r.status_code == 403
- assert (
- r.json()["detail"] == "Open user registration is forbidden on this server"
- )
- def test_create_user_open_already_exists_error(client: TestClient) -> None:
- with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
- password = random_lower_string()
- full_name = random_lower_string()
- data = {
- "email": settings.FIRST_SUPERUSER,
- "password": password,
- "full_name": full_name,
- }
- r = client.post(
- f"{settings.API_V1_STR}/users/open",
- json=data,
- )
- assert r.status_code == 400
- assert (
- r.json()["detail"]
- == "The user with this email already exists in the system"
- )
- def test_update_user(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- data = {"full_name": "Updated_full_name"}
- r = client.patch(
- f"{settings.API_V1_STR}/users/{user.id}",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 200
- updated_user = r.json()
- assert updated_user["full_name"] == "Updated_full_name"
- def test_update_user_not_exists(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- data = {"full_name": "Updated_full_name"}
- r = client.patch(
- f"{settings.API_V1_STR}/users/99999999",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 404
- assert r.json()["detail"] == "The user with this id does not exist in the system"
- def test_update_user_email_exists(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- username2 = random_email()
- password2 = random_lower_string()
- user_in2 = UserCreate(email=username2, password=password2)
- user2 = crud.create_user(session=db, user_create=user_in2)
- data = {"email": user2.email}
- r = client.patch(
- f"{settings.API_V1_STR}/users/{user.id}",
- headers=superuser_token_headers,
- json=data,
- )
- assert r.status_code == 409
- assert r.json()["detail"] == "User with this email already exists"
- def test_delete_user_super_user(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- user_id = user.id
- r = client.delete(
- f"{settings.API_V1_STR}/users/{user_id}",
- headers=superuser_token_headers,
- )
- assert r.status_code == 200
- deleted_user = r.json()
- assert deleted_user["message"] == "User deleted successfully"
- def test_delete_user_current_user(client: TestClient, db: Session) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- user_id = user.id
- login_data = {
- "username": username,
- "password": password,
- }
- r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
- tokens = r.json()
- a_token = tokens["access_token"]
- headers = {"Authorization": f"Bearer {a_token}"}
- r = client.delete(
- f"{settings.API_V1_STR}/users/{user_id}",
- headers=headers,
- )
- assert r.status_code == 200
- deleted_user = r.json()
- assert deleted_user["message"] == "User deleted successfully"
- def test_delete_user_not_found(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- r = client.delete(
- f"{settings.API_V1_STR}/users/99999999",
- headers=superuser_token_headers,
- )
- assert r.status_code == 404
- assert r.json()["detail"] == "User not found"
- def test_delete_user_current_super_user_error(
- client: TestClient, superuser_token_headers: dict[str, str], db: Session
- ) -> None:
- super_user = crud.get_user_by_email(session=db, email=settings.FIRST_SUPERUSER)
- assert super_user
- user_id = super_user.id
- r = client.delete(
- f"{settings.API_V1_STR}/users/{user_id}",
- headers=superuser_token_headers,
- )
- assert r.status_code == 403
- assert r.json()["detail"] == "Super users are not allowed to delete themselves"
- def test_delete_user_without_privileges(
- client: TestClient, normal_user_token_headers: dict[str, str], db: Session
- ) -> None:
- username = random_email()
- password = random_lower_string()
- user_in = UserCreate(email=username, password=password)
- user = crud.create_user(session=db, user_create=user_in)
- r = client.delete(
- f"{settings.API_V1_STR}/users/{user.id}",
- headers=normal_user_token_headers,
- )
- assert r.status_code == 403
- assert r.json()["detail"] == "The user doesn't have enough privileges"
|