utils.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from app.api.deps import get_current_active_superuser
  2. from app.models.models import Message
  3. from app.utils.mail_util import generate_test_email, send_email
  4. from pydantic.networks import EmailStr
  5. from fastapi import APIRouter, BackgroundTasks, Depends
  6. from fastapi.templating import Jinja2Templates
  7. router = APIRouter()
  8. templates = Jinja2Templates(directory="./coronavirus/templates")
  9. @router.post(
  10. "/test-email/",
  11. dependencies=[Depends(get_current_active_superuser)],
  12. status_code=201,
  13. )
  14. def test_email(email_to: EmailStr) -> Message:
  15. """
  16. Test emails.
  17. """
  18. email_data = generate_test_email(email_to=email_to)
  19. send_email(
  20. email_to=email_to,
  21. subject=email_data.subject,
  22. html_content=email_data.html_content,
  23. )
  24. return Message(message="Test email sent")
  25. @router.get("/test2")
  26. async def test():
  27. return {
  28. "code": 200,
  29. "message": "hello world",
  30. "data": [{"name": "张三", "sex": "男", "age": 19}],
  31. }
  32. @router.get("/city/{city}")
  33. async def get_city(city: str, query_string: str | None = None):
  34. return {"city": city, "query_string": query_string}
  35. @router.post("/background_tasks")
  36. async def run_bg_task(framework: str, background_tasks: BackgroundTasks):
  37. """
  38. :param framework: 被调用的后台任务函数的参数
  39. :param background_tasks: FastAPI.BackgroundTasks
  40. :return:
  41. """
  42. background_tasks.add_task(bg_task, framework)
  43. return {"message": "任务已在后台运行"}
  44. def bg_task(framework: str):
  45. with open("README.md", mode="a") as f:
  46. f.write(f"## {framework} 框架精讲")