12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- # URL helpers, see https://github.com/NVlabs/stylegan
- # ------------------------------------------------------------------------------------------
- import requests
- import html
- import hashlib
- import glob
- import os
- import io
- from typing import Any
- import re
- import uuid
- def is_url(obj: Any) -> bool:
- """Determine whether the given object is a valid URL string."""
- if not isinstance(obj, str) or not "://" in obj:
- return False
- try:
- res = requests.compat.urlparse(obj)
- if not res.scheme or not res.netloc or not "." in res.netloc:
- return False
- res = requests.compat.urlparse(requests.compat.urljoin(obj, "/"))
- if not res.scheme or not res.netloc or not "." in res.netloc:
- return False
- except:
- return False
- return True
- def open_url(url: str, cache_dir: str = None, num_attempts: int = 10, verbose: bool = True, return_path: bool = False) -> Any:
- """Download the given URL and return a binary-mode file object to access the data."""
- assert is_url(url)
- assert num_attempts >= 1
- # Lookup from cache.
- url_md5 = hashlib.md5(url.encode("utf-8")).hexdigest()
- if cache_dir is not None:
- cache_files = glob.glob(os.path.join(cache_dir, url_md5 + "_*"))
- if len(cache_files) == 1:
- if(return_path):
- return cache_files[0]
- else:
- return open(cache_files[0], "rb")
- # Download.
- url_name = None
- url_data = None
- with requests.Session() as session:
- if verbose:
- print("Downloading %s ..." % url, end="", flush=True)
- for attempts_left in reversed(range(num_attempts)):
- try:
- with session.get(url) as res:
- res.raise_for_status()
- if len(res.content) == 0:
- raise IOError("No data received")
- if len(res.content) < 8192:
- content_str = res.content.decode("utf-8")
- if "download_warning" in res.headers.get("Set-Cookie", ""):
- links = [html.unescape(link) for link in content_str.split('"') if "export=download" in link]
- if len(links) == 1:
- url = requests.compat.urljoin(url, links[0])
- raise IOError("Google Drive virus checker nag")
- if "Google Drive - Quota exceeded" in content_str:
- raise IOError("Google Drive quota exceeded")
- match = re.search(r'filename="([^"]*)"', res.headers.get("Content-Disposition", ""))
- url_name = match[1] if match else url
- url_data = res.content
- if verbose:
- print(" done")
- break
- except:
- if not attempts_left:
- if verbose:
- print(" failed")
- raise
- if verbose:
- print(".", end="", flush=True)
- # Save to cache.
- if cache_dir is not None:
- safe_name = re.sub(r"[^0-9a-zA-Z-._]", "_", url_name)
- cache_file = os.path.join(cache_dir, url_md5 + "_" + safe_name)
- temp_file = os.path.join(cache_dir, "tmp_" + uuid.uuid4().hex + "_" + url_md5 + "_" + safe_name)
- os.makedirs(cache_dir, exist_ok=True)
- with open(temp_file, "wb") as f:
- f.write(url_data)
- os.replace(temp_file, cache_file) # atomic
- if(return_path): return cache_file
- # Return data as file object.
- return io.BytesIO(url_data)
|