mail_util.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import logging
  2. import random
  3. import string
  4. from dataclasses import dataclass
  5. from datetime import datetime, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. import emails # type: ignore
  9. from app.config import settings
  10. from jinja2 import Template
  11. from jose import JWTError, jwt
  12. @dataclass
  13. class EmailData:
  14. html_content: str
  15. subject: str
  16. def render_email_template(*, template_name: str, context: dict[str, Any]) -> str:
  17. template_str = (
  18. Path(__file__).parent / "email-templates" / "build" / template_name
  19. ).read_text()
  20. html_content = Template(template_str).render(context)
  21. return html_content
  22. def send_email(
  23. *,
  24. email_to: str,
  25. subject: str = "",
  26. html_content: str = "",
  27. ) -> None:
  28. assert settings.emails_enabled, "no provided configuration for email variables"
  29. message = emails.Message(
  30. subject=subject,
  31. html=html_content,
  32. mail_from=(settings.EMAILS_FROM_NAME, settings.EMAILS_FROM_EMAIL),
  33. )
  34. smtp_options = {"host": settings.SMTP_HOST, "port": settings.SMTP_PORT}
  35. if settings.SMTP_TLS:
  36. smtp_options["tls"] = True
  37. elif settings.SMTP_SSL:
  38. smtp_options["ssl"] = True
  39. if settings.SMTP_USER:
  40. smtp_options["user"] = settings.SMTP_USER
  41. if settings.SMTP_PASSWORD:
  42. smtp_options["password"] = settings.SMTP_PASSWORD
  43. response = message.send(to=email_to, smtp=smtp_options)
  44. logging.info(f"send email result: {response}")
  45. def generate_test_email(email_to: str) -> EmailData:
  46. project_name = settings.PROJECT_NAME
  47. subject = f"{project_name} - Test email"
  48. html_content = render_email_template(
  49. template_name="test_email.html",
  50. context={"project_name": settings.PROJECT_NAME, "email": email_to},
  51. )
  52. return EmailData(html_content=html_content, subject=subject)
  53. def generate_reset_password_email(email_to: str, email: str, token: str) -> EmailData:
  54. project_name = settings.PROJECT_NAME
  55. subject = f"{project_name} - Password recovery for user {email}"
  56. link = f"{settings.server_host}/reset-password?token={token}"
  57. html_content = render_email_template(
  58. template_name="reset_password.html",
  59. context={
  60. "project_name": settings.PROJECT_NAME,
  61. "username": email,
  62. "email": email_to,
  63. "valid_hours": settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS,
  64. "link": link,
  65. },
  66. )
  67. return EmailData(html_content=html_content, subject=subject)
  68. def generate_new_account_email(
  69. email_to: str, username: str, password: str
  70. ) -> EmailData:
  71. project_name = settings.PROJECT_NAME
  72. subject = f"{project_name} - New account for user {username}"
  73. html_content = render_email_template(
  74. template_name="new_account.html",
  75. context={
  76. "project_name": settings.PROJECT_NAME,
  77. "username": username,
  78. "password": password,
  79. "email": email_to,
  80. "link": settings.server_host,
  81. },
  82. )
  83. return EmailData(html_content=html_content, subject=subject)
  84. def generate_password_reset_token(email: str) -> str:
  85. delta = timedelta(hours=settings.EMAIL_RESET_TOKEN_EXPIRE_HOURS)
  86. now = datetime.utcnow()
  87. expires = now + delta
  88. exp = expires.timestamp()
  89. encoded_jwt = jwt.encode(
  90. {"exp": exp, "nbf": now, "sub": email},
  91. settings.SECRET_KEY,
  92. algorithm="HS256",
  93. )
  94. return encoded_jwt
  95. def verify_password_reset_token(token: str) -> str | None:
  96. try:
  97. decoded_token = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
  98. return str(decoded_token["sub"])
  99. except JWTError:
  100. return None
  101. logger = logging.getLogger(__name__)
  102. ALPHA_NUM = string.ascii_letters + string.digits
  103. def generate_random_alphanum(length: int = 20) -> str:
  104. return "".join(random.choices(ALPHA_NUM, k=length))