import whois from concurrent.futures import ThreadPoolExecutor import logging,os import argparse from . import db class SearchDomain(object): """search avaliable domain and save result""" def __init__(self, params: dict, debug=False, export_all=True, log_callback=None): ''' 初始化 debug 调试模式 export_all 是否导出所有域名,默认导出可用域名 log_callback 日志回调函数,用于GUI模式下显示日志 return: ''' super(SearchDomain, self).__init__() self.params = params self.export_all=export_all self.input=params["input"] self.output=params["output"] self.log_callback = log_callback self._output_file_checked = False # 标记输出文件是否已检查 # 获取当前模块的日志记录器 self.logger = logging.getLogger(__name__) # 配置日志系统 if log_callback: # 如果有日志回调函数,创建自定义处理器 # 创建自定义处理器,将日志输出到回调函数 class CallbackHandler(logging.Handler): def __init__(self, callback): super().__init__() self.callback = callback def emit(self, record): msg = self.format(record) if self.callback: self.callback(msg) self.logger.setLevel(logging.INFO) # 检查是否已经添加了回调处理器,避免重复添加 has_callback_handler = any(isinstance(h, CallbackHandler) for h in self.logger.handlers) if not has_callback_handler: callback_handler = CallbackHandler(log_callback) callback_handler.setLevel(logging.INFO) formatter = logging.Formatter('%(message)s') callback_handler.setFormatter(formatter) self.logger.addHandler(callback_handler) elif debug == True: logging.basicConfig(level=logging.DEBUG, format='%(message)s', force=True) else: logging.basicConfig(level=logging.INFO, format='%(message)s', force=True) def crawl(self, domain: str, index:int) -> None: ''' 检测域名是否可用 :params domain 域名: :return true or false''' res = False try: whi = whois.whois(domain) res = False self.logger.info(str(index) + ": searching domain:"+ domain + " is unavaliable.") except Exception as e: error_str = str(e) # 检查是否是域名未注册的错误 if "No match" in error_str or "No match for" in error_str: res = True self.logger.info(str(index) + ": searching domain:"+ domain +" is avaliable.") else: res = False self.logger.error(f"Error checking {domain}: {error_str}") if self.export_all: self.saveRes(domain, res) else: if res: self.saveRes(domain, res) def saveRes(self, domain: str, res: bool): """ save result to file """ # db.Mysql().save() output_path = os.path.join(self.params["app_path"], self.output) # 检查输出文件是否存在,不存在则创建并警告(只检查一次) if not self._output_file_checked: if not os.path.exists(output_path): # 确保目录存在 dir_path = os.path.dirname(output_path) if dir_path and not os.path.exists(dir_path): os.makedirs(dir_path, exist_ok=True) # 创建文件 with open(output_path, 'w', encoding='utf-8') as f: pass self.logger.warning(f"警告:输出文件不存在,已创建: {output_path}") self._output_file_checked = True db.File().save(output_path, domain + " " + str(res)) def run(self): '''begin search domain''' # 支持 input 为完整路径或相对路径 input_path = self.input if os.path.isabs(self.input) else os.path.join(self.params["app_path"], self.input) with open(input_path, "r", encoding="utf8", errors="ignore") as file: with ThreadPoolExecutor(max_workers=5) as pool: index = 0 futures = [] for line in file.readlines(): domain = line.strip() if domain: # 跳过空行 index = index + 1 future = pool.submit(self.crawl, domain, index) futures.append(future) # 等待所有任务完成 for future in futures: future.result() if __name__ == '__main__': sd = SearchDomain() sd.run()