import whois from concurrent.futures import ThreadPoolExecutor import logging,os import argparse from . import db class SearchDomain(object): """search avaliable domain and save result""" def __init__(self, params: dict, debug=False, export_all=True): ''' 初始化 debug 调试模式 export_all 是否导出所有域名,默认导出可用域名 return: ''' super(SearchDomain, self).__init__() self.params = params self.export_all=export_all self.input=params["input"] self.output=params["output"] if debug == True: logging.basicConfig(level=logging.DEBUG) def crawl(self, domain: str, index:int) -> None: ''' 检测域名是否可用 :params domain 域名: :return true or false''' res = False try: whi = whois.whois(domain) res = False logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.") except Exception as e: error_str = str(e) # 检查是否是域名未注册的错误 if "No match" in error_str or "No match for" in error_str: res = True logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.") else: res = False logging.error(f"Error checking {domain}: {error_str}") if self.export_all: self.saveRes(domain, res) else: if res: self.saveRes(domain, res) def saveRes(self, domain: str, res: bool): """ save result to file """ # db.Mysql().save() output_path = os.path.join(self.params["app_path"], self.output) db.File().save(output_path, domain + " " + str(res)) def run(self): '''begin search domain''' # 支持 input 为完整路径或相对路径 input_path = self.input if os.path.isabs(self.input) else os.path.join(self.params["app_path"], self.input) with open(input_path, "r", encoding="utf8", errors="ignore") as file: with ThreadPoolExecutor(max_workers=5) as pool: index = 0 futures = [] for line in file.readlines(): domain = line.strip() if domain: # 跳过空行 index = index + 1 future = pool.submit(self.crawl, domain, index) futures.append(future) # 等待所有任务完成 for future in futures: future.result() if __name__ == '__main__': sd = SearchDomain() sd.run()