docker.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. import json
  2. import os
  3. from typing import Any
  4. from docker.client import DockerClient
  5. from docker.errors import NotFound
  6. from docker.models.containers import Container
  7. CONTAINER_NAME = "uvicorn-gunicorn-fastapi-test"
  8. def get_process_names(container: Container) -> list[str]:
  9. top = container.top()
  10. process_commands = [p[7] for p in top["Processes"]]
  11. gunicorn_processes = [p for p in process_commands if "gunicorn" in p]
  12. return gunicorn_processes
  13. def get_gunicorn_conf_path(container: Container) -> str:
  14. gunicorn_processes = get_process_names(container)
  15. first_process = gunicorn_processes[0]
  16. first_part, partition, last_part = first_process.partition("-c")
  17. gunicorn_conf = last_part.strip().split()[0]
  18. return gunicorn_conf
  19. def get_config(container: Container) -> dict[str, Any]:
  20. gunicorn_conf = get_gunicorn_conf_path(container)
  21. result = container.exec_run(f"python {gunicorn_conf}")
  22. return json.loads(result.output.decode())
  23. def remove_previous_container(client: DockerClient) -> None:
  24. try:
  25. previous = client.containers.get(CONTAINER_NAME)
  26. previous.stop()
  27. previous.remove()
  28. except NotFound:
  29. return None
  30. def get_logs(container: DockerClient) -> str:
  31. logs = container.logs()
  32. return logs.decode("utf-8")
  33. def get_response_text1() -> str:
  34. python_version = os.getenv("PYTHON_VERSION")
  35. return f"Hello world! From FastAPI running on Uvicorn with Gunicorn. Using Python {python_version}"