12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- import whois
- from concurrent.futures import ThreadPoolExecutor
- import logging,os
- import argparse
- from . import db
- class SearchDomain(object):
- """search avaliable domain and save result"""
- def __init__(self, params: dict, debug=False, export_all=True):
- '''
- 初始化
- debug 调试模式
- export_all 是否导出所有域名,默认导出可用域名
- return:
- '''
- super(SearchDomain, self).__init__()
- self.params = params
- self.export_all=export_all
- self.input=params["input"]
- self.output=params["output"]
- if debug == True:
- logging.basicConfig(level=logging.DEBUG)
- def crawl(self, domain: str, index:int) -> None:
- '''
- 检测域名是否可用
- :params domain 域名:
- :return true or false'''
- res = False
- try:
- whi = whois.whois(domain)
- res = False
- logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
- except Exception as e:
- if(str(e).index("No match") == 0):
- res = True
- logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
- else:
- res = False
- logging.error(e)
- if self.export_all:
- self.saveRes(domain, res)
- else:
- if res:
- self.saveRes(domain, res)
- def saveRes(self, domain: str, res: bool):
- """ save result to file """
- # db.Mysql().save()
- db.File().save(os.path.join(self.params["app_path"], self.output), domain + " " + str(res))
- def run(self):
- '''begin search domain'''
- with open(os.path.join(self.params["app_path"], self.input), "r", encoding="utf8", errors="ignore") as file:
- pool = ThreadPoolExecutor(max_workers=5)
- index = 0
- for line in file.readlines():
- index = index + 1
- pool.submit(self.crawl, line.strip(), index)
- if __name__ == '__main__':
- sd = SearchDomain()
- sd.run()
|