import whois from concurrent.futures import ThreadPoolExecutor import os,sys,re,json # import requests class DomainNotify(object): """域名到期推送""" def __init__(self): super(DomainNotify,self).__init__() def crawl(self, domain:str)->None: ''' 检测域名是否可用 :params domain 域名: :return true or false''' res=False try: whi = whois.whois(domain) res= False except Exception as e: if(str(e).index("No match")==0): res= True else: res= False self.saveRes(domain,res) self.notify(domain,res) def notify(self,domain): '''结果推送''' pass def saveRes(damin:str, res:bool): # mysql.save() # file.save() pass def run(self): with open("res/res.json","w",encoding="utf8") as file: pool=ThreadPoolExecutor(max_workers=10) for i in range(100): pool.submit(self.crawl, domain) if __name__ == '__main__': sd = DomainNotify() sd.run()