test_users.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. from unittest.mock import patch
  2. from app import crud
  3. from app.core.config import settings
  4. from apps.models import UserCreate
  5. from apps.tests.utils.utils import random_email, random_lower_string
  6. from sqlmodel import Session
  7. from fastapi.testclient import TestClient
  8. def test_get_users_superuser_me(
  9. client: TestClient, superuser_token_headers: dict[str, str]
  10. ) -> None:
  11. r = client.get(f"{settings.API_V1_STR}/users/me", headers=superuser_token_headers)
  12. current_user = r.json()
  13. assert current_user
  14. assert current_user["is_active"] is True
  15. assert current_user["is_superuser"]
  16. assert current_user["email"] == settings.FIRST_SUPERUSER
  17. def test_get_users_normal_user_me(
  18. client: TestClient, normal_user_token_headers: dict[str, str]
  19. ) -> None:
  20. r = client.get(f"{settings.API_V1_STR}/users/me", headers=normal_user_token_headers)
  21. current_user = r.json()
  22. assert current_user
  23. assert current_user["is_active"] is True
  24. assert current_user["is_superuser"] is False
  25. assert current_user["email"] == settings.EMAIL_TEST_USER
  26. def test_create_user_new_email(
  27. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  28. ) -> None:
  29. with patch("app.utils.send_email", return_value=None), patch(
  30. "app.core.config.settings.SMTP_HOST", "smtp.example.com"
  31. ), patch("app.core.config.settings.SMTP_USER", "admin@example.com"):
  32. username = random_email()
  33. password = random_lower_string()
  34. data = {"email": username, "password": password}
  35. r = client.post(
  36. f"{settings.API_V1_STR}/users/",
  37. headers=superuser_token_headers,
  38. json=data,
  39. )
  40. assert 200 <= r.status_code < 300
  41. created_user = r.json()
  42. user = crud.get_user_by_email(session=db, email=username)
  43. assert user
  44. assert user.email == created_user["email"]
  45. def test_get_existing_user(
  46. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  47. ) -> None:
  48. username = random_email()
  49. password = random_lower_string()
  50. user_in = UserCreate(email=username, password=password)
  51. user = crud.create_user(session=db, user_create=user_in)
  52. user_id = user.id
  53. r = client.get(
  54. f"{settings.API_V1_STR}/users/{user_id}",
  55. headers=superuser_token_headers,
  56. )
  57. assert 200 <= r.status_code < 300
  58. api_user = r.json()
  59. existing_user = crud.get_user_by_email(session=db, email=username)
  60. assert existing_user
  61. assert existing_user.email == api_user["email"]
  62. def test_get_existing_user_current_user(client: TestClient, db: Session) -> None:
  63. username = random_email()
  64. password = random_lower_string()
  65. user_in = UserCreate(email=username, password=password)
  66. user = crud.create_user(session=db, user_create=user_in)
  67. user_id = user.id
  68. login_data = {
  69. "username": username,
  70. "password": password,
  71. }
  72. r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
  73. tokens = r.json()
  74. a_token = tokens["access_token"]
  75. headers = {"Authorization": f"Bearer {a_token}"}
  76. r = client.get(
  77. f"{settings.API_V1_STR}/users/{user_id}",
  78. headers=headers,
  79. )
  80. assert 200 <= r.status_code < 300
  81. api_user = r.json()
  82. existing_user = crud.get_user_by_email(session=db, email=username)
  83. assert existing_user
  84. assert existing_user.email == api_user["email"]
  85. def test_get_existing_user_permissions_error(
  86. client: TestClient, normal_user_token_headers: dict[str, str], db: Session
  87. ) -> None:
  88. r = client.get(
  89. f"{settings.API_V1_STR}/users/999999",
  90. headers=normal_user_token_headers,
  91. )
  92. assert r.status_code == 403
  93. assert r.json() == {"detail": "The user doesn't have enough privileges"}
  94. def test_create_user_existing_username(
  95. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  96. ) -> None:
  97. username = random_email()
  98. # username = email
  99. password = random_lower_string()
  100. user_in = UserCreate(email=username, password=password)
  101. crud.create_user(session=db, user_create=user_in)
  102. data = {"email": username, "password": password}
  103. r = client.post(
  104. f"{settings.API_V1_STR}/users/",
  105. headers=superuser_token_headers,
  106. json=data,
  107. )
  108. created_user = r.json()
  109. assert r.status_code == 400
  110. assert "_id" not in created_user
  111. def test_create_user_by_normal_user(
  112. client: TestClient, normal_user_token_headers: dict[str, str]
  113. ) -> None:
  114. username = random_email()
  115. password = random_lower_string()
  116. data = {"email": username, "password": password}
  117. r = client.post(
  118. f"{settings.API_V1_STR}/users/",
  119. headers=normal_user_token_headers,
  120. json=data,
  121. )
  122. assert r.status_code == 400
  123. def test_retrieve_users(
  124. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  125. ) -> None:
  126. username = random_email()
  127. password = random_lower_string()
  128. user_in = UserCreate(email=username, password=password)
  129. crud.create_user(session=db, user_create=user_in)
  130. username2 = random_email()
  131. password2 = random_lower_string()
  132. user_in2 = UserCreate(email=username2, password=password2)
  133. crud.create_user(session=db, user_create=user_in2)
  134. r = client.get(f"{settings.API_V1_STR}/users/", headers=superuser_token_headers)
  135. all_users = r.json()
  136. assert len(all_users["data"]) > 1
  137. assert "count" in all_users
  138. for item in all_users["data"]:
  139. assert "email" in item
  140. def test_update_user_me(
  141. client: TestClient, normal_user_token_headers: dict[str, str], db: Session
  142. ) -> None:
  143. full_name = "Updated Name"
  144. email = random_email()
  145. data = {"full_name": full_name, "email": email}
  146. r = client.patch(
  147. f"{settings.API_V1_STR}/users/me",
  148. headers=normal_user_token_headers,
  149. json=data,
  150. )
  151. assert r.status_code == 200
  152. updated_user = r.json()
  153. assert updated_user["email"] == email
  154. assert updated_user["full_name"] == full_name
  155. def test_update_password_me(
  156. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  157. ) -> None:
  158. new_password = random_lower_string()
  159. data = {
  160. "current_password": settings.FIRST_SUPERUSER_PASSWORD,
  161. "new_password": new_password,
  162. }
  163. r = client.patch(
  164. f"{settings.API_V1_STR}/users/me/password",
  165. headers=superuser_token_headers,
  166. json=data,
  167. )
  168. assert r.status_code == 200
  169. updated_user = r.json()
  170. assert updated_user["message"] == "Password updated successfully"
  171. # Revert to the old password to keep consistency in test
  172. old_data = {
  173. "current_password": new_password,
  174. "new_password": settings.FIRST_SUPERUSER_PASSWORD,
  175. }
  176. r = client.patch(
  177. f"{settings.API_V1_STR}/users/me/password",
  178. headers=superuser_token_headers,
  179. json=old_data,
  180. )
  181. assert r.status_code == 200
  182. def test_update_password_me_incorrect_password(
  183. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  184. ) -> None:
  185. new_password = random_lower_string()
  186. data = {"current_password": new_password, "new_password": new_password}
  187. r = client.patch(
  188. f"{settings.API_V1_STR}/users/me/password",
  189. headers=superuser_token_headers,
  190. json=data,
  191. )
  192. assert r.status_code == 400
  193. updated_user = r.json()
  194. assert updated_user["detail"] == "Incorrect password"
  195. def test_update_user_me_email_exists(
  196. client: TestClient, normal_user_token_headers: dict[str, str], db: Session
  197. ) -> None:
  198. username = random_email()
  199. password = random_lower_string()
  200. user_in = UserCreate(email=username, password=password)
  201. user = crud.create_user(session=db, user_create=user_in)
  202. data = {"email": user.email}
  203. r = client.patch(
  204. f"{settings.API_V1_STR}/users/me",
  205. headers=normal_user_token_headers,
  206. json=data,
  207. )
  208. assert r.status_code == 409
  209. assert r.json()["detail"] == "User with this email already exists"
  210. def test_update_password_me_same_password_error(
  211. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  212. ) -> None:
  213. data = {
  214. "current_password": settings.FIRST_SUPERUSER_PASSWORD,
  215. "new_password": settings.FIRST_SUPERUSER_PASSWORD,
  216. }
  217. r = client.patch(
  218. f"{settings.API_V1_STR}/users/me/password",
  219. headers=superuser_token_headers,
  220. json=data,
  221. )
  222. assert r.status_code == 400
  223. updated_user = r.json()
  224. assert (
  225. updated_user["detail"] == "New password cannot be the same as the current one"
  226. )
  227. def test_create_user_open(client: TestClient) -> None:
  228. with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
  229. username = random_email()
  230. password = random_lower_string()
  231. full_name = random_lower_string()
  232. data = {"email": username, "password": password, "full_name": full_name}
  233. r = client.post(
  234. f"{settings.API_V1_STR}/users/open",
  235. json=data,
  236. )
  237. assert r.status_code == 200
  238. created_user = r.json()
  239. assert created_user["email"] == username
  240. assert created_user["full_name"] == full_name
  241. def test_create_user_open_forbidden_error(client: TestClient) -> None:
  242. with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", False):
  243. username = random_email()
  244. password = random_lower_string()
  245. full_name = random_lower_string()
  246. data = {"email": username, "password": password, "full_name": full_name}
  247. r = client.post(
  248. f"{settings.API_V1_STR}/users/open",
  249. json=data,
  250. )
  251. assert r.status_code == 403
  252. assert (
  253. r.json()["detail"] == "Open user registration is forbidden on this server"
  254. )
  255. def test_create_user_open_already_exists_error(client: TestClient) -> None:
  256. with patch("app.core.config.settings.USERS_OPEN_REGISTRATION", True):
  257. password = random_lower_string()
  258. full_name = random_lower_string()
  259. data = {
  260. "email": settings.FIRST_SUPERUSER,
  261. "password": password,
  262. "full_name": full_name,
  263. }
  264. r = client.post(
  265. f"{settings.API_V1_STR}/users/open",
  266. json=data,
  267. )
  268. assert r.status_code == 400
  269. assert (
  270. r.json()["detail"]
  271. == "The user with this email already exists in the system"
  272. )
  273. def test_update_user(
  274. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  275. ) -> None:
  276. username = random_email()
  277. password = random_lower_string()
  278. user_in = UserCreate(email=username, password=password)
  279. user = crud.create_user(session=db, user_create=user_in)
  280. data = {"full_name": "Updated_full_name"}
  281. r = client.patch(
  282. f"{settings.API_V1_STR}/users/{user.id}",
  283. headers=superuser_token_headers,
  284. json=data,
  285. )
  286. assert r.status_code == 200
  287. updated_user = r.json()
  288. assert updated_user["full_name"] == "Updated_full_name"
  289. def test_update_user_not_exists(
  290. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  291. ) -> None:
  292. data = {"full_name": "Updated_full_name"}
  293. r = client.patch(
  294. f"{settings.API_V1_STR}/users/99999999",
  295. headers=superuser_token_headers,
  296. json=data,
  297. )
  298. assert r.status_code == 404
  299. assert r.json()["detail"] == "The user with this id does not exist in the system"
  300. def test_update_user_email_exists(
  301. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  302. ) -> None:
  303. username = random_email()
  304. password = random_lower_string()
  305. user_in = UserCreate(email=username, password=password)
  306. user = crud.create_user(session=db, user_create=user_in)
  307. username2 = random_email()
  308. password2 = random_lower_string()
  309. user_in2 = UserCreate(email=username2, password=password2)
  310. user2 = crud.create_user(session=db, user_create=user_in2)
  311. data = {"email": user2.email}
  312. r = client.patch(
  313. f"{settings.API_V1_STR}/users/{user.id}",
  314. headers=superuser_token_headers,
  315. json=data,
  316. )
  317. assert r.status_code == 409
  318. assert r.json()["detail"] == "User with this email already exists"
  319. def test_delete_user_super_user(
  320. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  321. ) -> None:
  322. username = random_email()
  323. password = random_lower_string()
  324. user_in = UserCreate(email=username, password=password)
  325. user = crud.create_user(session=db, user_create=user_in)
  326. user_id = user.id
  327. r = client.delete(
  328. f"{settings.API_V1_STR}/users/{user_id}",
  329. headers=superuser_token_headers,
  330. )
  331. assert r.status_code == 200
  332. deleted_user = r.json()
  333. assert deleted_user["message"] == "User deleted successfully"
  334. def test_delete_user_current_user(client: TestClient, db: Session) -> None:
  335. username = random_email()
  336. password = random_lower_string()
  337. user_in = UserCreate(email=username, password=password)
  338. user = crud.create_user(session=db, user_create=user_in)
  339. user_id = user.id
  340. login_data = {
  341. "username": username,
  342. "password": password,
  343. }
  344. r = client.post(f"{settings.API_V1_STR}/login/access-token", data=login_data)
  345. tokens = r.json()
  346. a_token = tokens["access_token"]
  347. headers = {"Authorization": f"Bearer {a_token}"}
  348. r = client.delete(
  349. f"{settings.API_V1_STR}/users/{user_id}",
  350. headers=headers,
  351. )
  352. assert r.status_code == 200
  353. deleted_user = r.json()
  354. assert deleted_user["message"] == "User deleted successfully"
  355. def test_delete_user_not_found(
  356. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  357. ) -> None:
  358. r = client.delete(
  359. f"{settings.API_V1_STR}/users/99999999",
  360. headers=superuser_token_headers,
  361. )
  362. assert r.status_code == 404
  363. assert r.json()["detail"] == "User not found"
  364. def test_delete_user_current_super_user_error(
  365. client: TestClient, superuser_token_headers: dict[str, str], db: Session
  366. ) -> None:
  367. super_user = crud.get_user_by_email(session=db, email=settings.FIRST_SUPERUSER)
  368. assert super_user
  369. user_id = super_user.id
  370. r = client.delete(
  371. f"{settings.API_V1_STR}/users/{user_id}",
  372. headers=superuser_token_headers,
  373. )
  374. assert r.status_code == 403
  375. assert r.json()["detail"] == "Super users are not allowed to delete themselves"
  376. def test_delete_user_without_privileges(
  377. client: TestClient, normal_user_token_headers: dict[str, str], db: Session
  378. ) -> None:
  379. username = random_email()
  380. password = random_lower_string()
  381. user_in = UserCreate(email=username, password=password)
  382. user = crud.create_user(session=db, user_create=user_in)
  383. r = client.delete(
  384. f"{settings.API_V1_STR}/users/{user.id}",
  385. headers=normal_user_token_headers,
  386. )
  387. assert r.status_code == 403
  388. assert r.json()["detail"] == "The user doesn't have enough privileges"