12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import whois
- from concurrent.futures import ThreadPoolExecutor
- import os
- import sys
- import re
- import json
- import logging
- import argparse
- from . import db
- class SearchDomain(object):
- """search avaliable domain and save result"""
- def __init__(self, debug=False, export_all=False):
- '''
- 初始化
- debug 调试模式
- export_all 是否导出所有域名,默认导出可用域名
- return:
- '''
- super(SearchDomain, self).__init__()
- self.export_all=export_all
- parser = argparse.ArgumentParser(description='Demo of argparse')
- parser.add_argument(
- "--input", help="set input domain list file,eg: domain.txt", type=str, default="domain.txt")
- parser.add_argument(
- "--output", help="set output domain result list file,eg: result.txt", type=str, default="result.txt")
- args = parser.parse_args()
- if args.input:
- self.input = args.input
- if args.output:
- self.output = args.output
- if debug == True:
- logging.basicConfig(level=logging.DEBUG)
- def crawl(self, domain: str) -> None:
- '''
- 检测域名是否可用
- :params domain 域名:
- :return true or false'''
- res = False
- try:
- whi = whois.whois(domain)
- res = False
- logging.info("searching domain:"+ domain + " is unavaliable.")
- except Exception as e:
- if(str(e).index("No match") == 0):
- res = True
- logging.info("searching domain:"+ domain +" is avaliable.")
- else:
- res = False
- logging.error(e)
- if self.export_all:
- self.saveRes(domain, res)
- else:
- if res:
- self.saveRes(domain, res)
- def saveRes(self, domain: str, res: bool):
- # db.Mysql().save()
- db.File().save(self.output, domain + " " + str(res))
- def run(self):
- '''begin search domain'''
- with open(self.input, "r", encoding="utf8", errors="ignore") as file:
- pool = ThreadPoolExecutor(max_workers=10)
- for line in file.readlines():
- pool.submit(self.crawl, line.strip())
- if __name__ == '__main__':
- sd = SearchDomain()
- sd.run()
|