import whois from concurrent.futures import ThreadPoolExecutor import os import sys import re import json import logging import argparse from . import db class SearchDomain(object): """search avaliable domain and save result""" def __init__(self, debug=False, export_all=False): ''' 初始化 debug 调试模式 export_all 是否导出所有域名,默认导出可用域名 return: ''' super(SearchDomain, self).__init__() self.export_all=export_all parser = argparse.ArgumentParser(description='Demo of argparse') parser.add_argument( "--input", help="set input domain list file,eg: domain.txt", type=str, default="domain.txt") parser.add_argument( "--output", help="set output domain result list file,eg: result.txt", type=str, default="result.txt") args = parser.parse_args() if args.input: self.input = args.input if args.output: self.output = args.output if debug == True: logging.basicConfig(level=logging.DEBUG) def crawl(self, domain: str) -> None: ''' 检测域名是否可用 :params domain 域名: :return true or false''' res = False try: whi = whois.whois(domain) res = False logging.info("searching domain:"+ domain + " is unavaliable.") except Exception as e: if(str(e).index("No match") == 0): res = True logging.info("searching domain:"+ domain +" is avaliable.") else: res = False logging.error(e) if self.export_all: self.saveRes(domain, res) else: if res: self.saveRes(domain, res) def saveRes(self, domain: str, res: bool): # db.Mysql().save() db.File().save(self.output, domain + " " + str(res)) def run(self): '''begin search domain''' with open(self.input, "r", encoding="utf8", errors="ignore") as file: pool = ThreadPoolExecutor(max_workers=10) for line in file.readlines(): pool.submit(self.crawl, line.strip()) if __name__ == '__main__': sd = SearchDomain() sd.run()