import whois from concurrent.futures import ThreadPoolExecutor import time from .push import EmailPush class DomainNotify(object): """域名到期推送""" def __init__(self): super(DomainNotify,self).__init__() def crawl(self, domain:str)->None: ''' 检测域名是否快到期,推送邮件 :params domain 域名: :return true or false''' try: whi = whois.whois(domain) expirationDate= whi.expiration_date notifyDate = str(expirationDate - time.datetime.timedelta(days=1)) today=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) days=DomainNotify.getDays(notifyDate,today) if days <=3: push=EmailPush() push.push() except Exception as e: print(e) self.notify(domain) @staticmethod def getDays(notifyDay:str, today:str): pass def notify(self,domain): '''结果推送''' pass def saveRes(damin:str, res:bool): # mysql.save() # file.save() pass def run(self): with open("res/res.json","w",encoding="utf8") as file: pool=ThreadPoolExecutor(max_workers=10) for i in range(100): pool.submit(self.crawl, domain) if __name__ == '__main__': sd = DomainNotify() sd.run()