domain_notify.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import time
  4. from .push import EmailPush
  5. class DomainNotify(object):
  6. """域名到期推送"""
  7. def __init__(self):
  8. super(DomainNotify,self).__init__()
  9. def crawl(self, domain:str)->None:
  10. '''
  11. 检测域名是否快到期,推送邮件
  12. :params domain 域名:
  13. :return true or false'''
  14. try:
  15. whi = whois.whois(domain)
  16. expirationDate= whi.expiration_date
  17. notifyDate = str(expirationDate - time.datetime.timedelta(days=1))
  18. today=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  19. days=DomainNotify.getDays(notifyDate,today)
  20. if days <=3:
  21. push=EmailPush()
  22. push.push()
  23. except Exception as e:
  24. print(e)
  25. self.notify(domain)
  26. @staticmethod
  27. def getDays(notifyDay:str, today:str):
  28. pass
  29. def notify(self,domain):
  30. '''结果推送'''
  31. pass
  32. def saveRes(damin:str, res:bool):
  33. # mysql.save()
  34. # file.save()
  35. pass
  36. def run(self):
  37. with open("res/res.json","w",encoding="utf8") as file:
  38. pool=ThreadPoolExecutor(max_workers=10)
  39. for i in range(100):
  40. pool.submit(self.crawl, domain)
  41. if __name__ == '__main__':
  42. sd = DomainNotify()
  43. sd.run()