domain_notify.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import os,sys,re,json
  4. # import requests
  5. class DomainNotify(object):
  6. """域名到期推送"""
  7. def __init__(self):
  8. super(DomainNotify,self).__init__()
  9. def crawl(self, domain:str)->None:
  10. '''
  11. 检测域名是否可用
  12. :params domain 域名:
  13. :return true or false'''
  14. res=False
  15. try:
  16. whi = whois.whois(domain)
  17. res= False
  18. except Exception as e:
  19. if(str(e).index("No match")==0):
  20. res= True
  21. else:
  22. res= False
  23. self.saveRes(domain,res)
  24. self.notify(domain,res)
  25. def notify(self,domain):
  26. '''结果推送'''
  27. pass
  28. def saveRes(damin:str, res:bool):
  29. # mysql.save()
  30. # file.save()
  31. pass
  32. def run(self):
  33. with open("res/res.json","w",encoding="utf8") as file:
  34. pool=ThreadPoolExecutor(max_workers=10)
  35. for i in range(100):
  36. pool.submit(self.crawl, domain)
  37. if __name__ == '__main__':
  38. sd = DomainNotify()
  39. sd.run()