searchdomain.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import os
  4. import sys
  5. import re
  6. import json
  7. import logging
  8. import argparse
  9. from . import db
  10. class SearchDomain(object):
  11. """search avaliable domain and save result"""
  12. def __init__(self, debug=False, export_all=False):
  13. '''
  14. 初始化
  15. debug 调试模式
  16. export_all 是否导出所有域名,默认导出可用域名
  17. return:
  18. '''
  19. super(SearchDomain, self).__init__()
  20. self.export_all=export_all
  21. parser = argparse.ArgumentParser(description='Demo of argparse')
  22. parser.add_argument(
  23. "--input", help="set input domain list file,eg: domain.txt", type=str, default="domain.txt")
  24. parser.add_argument(
  25. "--output", help="set output domain result list file,eg: result.txt", type=str, default="result.txt")
  26. args = parser.parse_args()
  27. if args.input:
  28. self.input = args.input
  29. if args.output:
  30. self.output = args.output
  31. if debug == True:
  32. logging.basicConfig(level=logging.DEBUG)
  33. def crawl(self, domain: str) -> None:
  34. '''
  35. 检测域名是否可用
  36. :params domain 域名:
  37. :return true or false'''
  38. res = False
  39. try:
  40. whi = whois.whois(domain)
  41. res = False
  42. logging.info("searching domain:"+ domain + " is unavaliable.")
  43. except Exception as e:
  44. if(str(e).index("No match") == 0):
  45. res = True
  46. logging.info("searching domain:"+ domain +" is avaliable.")
  47. else:
  48. res = False
  49. logging.error(e)
  50. if self.export_all:
  51. self.saveRes(domain, res)
  52. else:
  53. if res:
  54. self.saveRes(domain, res)
  55. def saveRes(self, domain: str, res: bool):
  56. # db.Mysql().save()
  57. db.File().save(self.output, domain + " " + str(res))
  58. def run(self):
  59. '''begin search domain'''
  60. with open(self.input, "r", encoding="utf8", errors="ignore") as file:
  61. pool = ThreadPoolExecutor(max_workers=10)
  62. for line in file.readlines():
  63. pool.submit(self.crawl, line.strip())
  64. if __name__ == '__main__':
  65. sd = SearchDomain()
  66. sd.run()