searchdomain.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import logging,os
  4. import argparse
  5. from . import db
  6. class SearchDomain(object):
  7. """search avaliable domain and save result"""
  8. def __init__(self, params: dict, debug=False, export_all=True):
  9. '''
  10. 初始化
  11. debug 调试模式
  12. export_all 是否导出所有域名,默认导出可用域名
  13. return:
  14. '''
  15. super(SearchDomain, self).__init__()
  16. self.params = params
  17. self.export_all=export_all
  18. self.input=params["input"]
  19. self.output=params["output"]
  20. if debug == True:
  21. logging.basicConfig(level=logging.DEBUG)
  22. def crawl(self, domain: str, index:int) -> None:
  23. '''
  24. 检测域名是否可用
  25. :params domain 域名:
  26. :return true or false'''
  27. res = False
  28. try:
  29. whi = whois.whois(domain)
  30. res = False
  31. logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
  32. except Exception as e:
  33. if(str(e).index("No match") == 0):
  34. res = True
  35. logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
  36. else:
  37. res = False
  38. logging.error(e)
  39. if self.export_all:
  40. self.saveRes(domain, res)
  41. else:
  42. if res:
  43. self.saveRes(domain, res)
  44. def saveRes(self, domain: str, res: bool):
  45. """ save result to file """
  46. # db.Mysql().save()
  47. db.File().save(os.path.join(self.params["app_path"], self.output), domain + " " + str(res))
  48. def run(self):
  49. '''begin search domain'''
  50. with open(os.path.join(self.params["app_path"], self.input), "r", encoding="utf8", errors="ignore") as file:
  51. pool = ThreadPoolExecutor(max_workers=5)
  52. index = 0
  53. for line in file.readlines():
  54. index = index + 1
  55. pool.submit(self.crawl, line.strip(), index)
  56. if __name__ == '__main__':
  57. sd = SearchDomain()
  58. sd.run()