domain_notify.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import os,sys,re,json
  4. # import requests
  5. import time
  6. from .push import EmailPush
  7. class DomainNotify(object):
  8. """域名到期推送"""
  9. def __init__(self):
  10. super(DomainNotify,self).__init__()
  11. def crawl(self, domain:str)->None:
  12. '''
  13. 检测域名是否可用
  14. :params domain 域名:
  15. :return true or false'''
  16. res=False
  17. try:
  18. whi = whois.whois(domain)
  19. expirationDate= whi.expiration_date
  20. notifyDate = str(expirationDate - time.datetime.timedelta(days=1))
  21. today=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  22. days=DomainNotify.getDays(notifyDate,today)
  23. if days <=3:
  24. push=EmailPush()
  25. push.push()
  26. res= False
  27. except Exception as e:
  28. if(str(e).index("No match")==0):
  29. res= True
  30. else:
  31. res= False
  32. self.saveRes(domain,res)
  33. self.notify(domain,res)
  34. @staticmethod
  35. def getDays(notifyDay:str, today:str):
  36. pass
  37. def notify(self,domain):
  38. '''结果推送'''
  39. pass
  40. def saveRes(damin:str, res:bool):
  41. # mysql.save()
  42. # file.save()
  43. pass
  44. def run(self):
  45. with open("res/res.json","w",encoding="utf8") as file:
  46. pool=ThreadPoolExecutor(max_workers=10)
  47. for i in range(100):
  48. pool.submit(self.crawl, domain)
  49. if __name__ == '__main__':
  50. sd = DomainNotify()
  51. sd.run()