monkey.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import os, sys
  2. from tempfile import NamedTemporaryFile
  3. from subprocess import Popen
  4. from fuse import FUSE, Operations, LoggingMixIn
  5. try:
  6. from urllib.request import urlopen # Python 3
  7. except ImportError:
  8. from urllib2 import urlopen # Python 2
  9. # domain with server.py running on it for testing
  10. DOMAIN = os.getenv("TRAVIS_DOMAIN", "travis-ci.gethttpsforfree.com")
  11. # generate account and domain keys
  12. def gen_keys():
  13. # good account key
  14. account_key = NamedTemporaryFile()
  15. Popen(["openssl", "genrsa", "-out", account_key.name, "2048"]).wait()
  16. # weak 1024 bit key
  17. weak_key = NamedTemporaryFile()
  18. Popen(["openssl", "genrsa", "-out", weak_key.name, "1024"]).wait()
  19. # good domain key
  20. domain_key = NamedTemporaryFile()
  21. domain_csr = NamedTemporaryFile()
  22. Popen(["openssl", "req", "-newkey", "rsa:2048", "-nodes", "-keyout", domain_key.name,
  23. "-subj", "/CN={0}".format(DOMAIN), "-out", domain_csr.name]).wait()
  24. # subject alt-name domain
  25. san_csr = NamedTemporaryFile()
  26. san_conf = NamedTemporaryFile()
  27. san_conf.write(open("/etc/ssl/openssl.cnf").read().encode("utf8"))
  28. san_conf.write("\n[SAN]\nsubjectAltName=DNS:{0}\n".format(DOMAIN).encode("utf8"))
  29. san_conf.seek(0)
  30. Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
  31. "-subj", "/", "-reqexts", "SAN", "-config", san_conf.name,
  32. "-out", san_csr.name]).wait()
  33. # invalid domain csr
  34. invalid_csr = NamedTemporaryFile()
  35. Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
  36. "-subj", "/CN=\xC3\xA0\xC2\xB2\xC2\xA0_\xC3\xA0\xC2\xB2\xC2\xA0.com", "-out", invalid_csr.name]).wait()
  37. # nonexistent domain csr
  38. nonexistent_csr = NamedTemporaryFile()
  39. Popen(["openssl", "req", "-new", "-sha256", "-key", domain_key.name,
  40. "-subj", "/CN=404.gethttpsforfree.com", "-out", nonexistent_csr.name]).wait()
  41. # account-signed domain csr
  42. account_csr = NamedTemporaryFile()
  43. Popen(["openssl", "req", "-new", "-sha256", "-key", account_key.name,
  44. "-subj", "/CN={0}".format(DOMAIN), "-out", account_csr.name]).wait()
  45. return {
  46. "account_key": account_key,
  47. "weak_key": weak_key,
  48. "domain_key": domain_key,
  49. "domain_csr": domain_csr,
  50. "san_csr": san_csr,
  51. "invalid_csr": invalid_csr,
  52. "nonexistent_csr": nonexistent_csr,
  53. "account_csr": account_csr,
  54. }
  55. # fake a folder structure to catch the key authorization file
  56. FS = {}
  57. class Passthrough(LoggingMixIn, Operations): # pragma: no cover
  58. def getattr(self, path, fh=None):
  59. f = FS.get(path, None)
  60. if f is None:
  61. return super(Passthrough, self).getattr(path, fh=fh)
  62. return f
  63. def write(self, path, buf, offset, fh):
  64. urlopen("http://{0}/.well-known/acme-challenge/?{1}".format(DOMAIN,
  65. os.getenv("TRAVIS_SESSION", "not_set")), buf)
  66. return len(buf)
  67. def create(self, path, mode, fi=None):
  68. FS[path] = {"st_mode": 33204}
  69. return 0
  70. def unlink(self, path):
  71. del(FS[path])
  72. return 0
  73. if __name__ == "__main__": # pragma: no cover
  74. FUSE(Passthrough(), sys.argv[1], nothreads=True, foreground=True)