import whois from concurrent.futures import ThreadPoolExecutor import logging,os import argparse from . import db class SearchDomain(object): """search avaliable domain and save result""" def __init__(self, params: dict, debug=False, export_all=True): ''' 初始化 debug 调试模式 export_all 是否导出所有域名,默认导出可用域名 return: ''' super(SearchDomain, self).__init__() self.params = params self.export_all=export_all self.input=params["input"] self.output=params["output"] if debug == True: logging.basicConfig(level=logging.DEBUG) def crawl(self, domain: str, index:int) -> None: ''' 检测域名是否可用 :params domain 域名: :return true or false''' res = False try: whi = whois.whois(domain) res = False logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.") except Exception as e: if(str(e).index("No match") == 0): res = True logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.") else: res = False logging.error(e) if self.export_all: self.saveRes(domain, res) else: if res: self.saveRes(domain, res) def saveRes(self, domain: str, res: bool): """ save result to file """ # db.Mysql().save() db.File().save(os.path.join(self.params["app_path"], self.output), domain + " " + str(res)) def run(self): '''begin search domain''' with open(os.path.join(self.params["app_path"], self.input), "r", encoding="utf8", errors="ignore") as file: pool = ThreadPoolExecutor(max_workers=5) index = 0 for line in file.readlines(): index = index + 1 pool.submit(self.crawl, line.strip(), index) if __name__ == '__main__': sd = SearchDomain() sd.run()