| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import whois
- from concurrent.futures import ThreadPoolExecutor
- import logging,os
- import argparse
- from . import db
- class SearchDomain(object):
- """search avaliable domain and save result"""
- def __init__(self, params: dict, debug=False, export_all=True, log_callback=None):
- '''
- 初始化
- debug 调试模式
- export_all 是否导出所有域名,默认导出可用域名
- log_callback 日志回调函数,用于GUI模式下显示日志
- return:
- '''
- super(SearchDomain, self).__init__()
- self.params = params
- self.export_all=export_all
- self.input=params["input"]
- self.output=params["output"]
- self.log_callback = log_callback
- self._output_file_checked = False # 标记输出文件是否已检查
- self._cancelled = False # 取消标志
-
- # 获取当前模块的日志记录器
- self.logger = logging.getLogger(__name__)
-
- # 配置日志系统
- if log_callback:
- # 如果有日志回调函数,创建自定义处理器
- # 创建自定义处理器,将日志输出到回调函数
- class CallbackHandler(logging.Handler):
- def __init__(self, callback):
- super().__init__()
- self.callback = callback
-
- def emit(self, record):
- msg = self.format(record)
- if self.callback:
- self.callback(msg)
-
- self.logger.setLevel(logging.INFO)
-
- # 检查是否已经添加了回调处理器,避免重复添加
- has_callback_handler = any(isinstance(h, CallbackHandler) for h in self.logger.handlers)
- if not has_callback_handler:
- callback_handler = CallbackHandler(log_callback)
- callback_handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(message)s')
- callback_handler.setFormatter(formatter)
- self.logger.addHandler(callback_handler)
- elif debug == True:
- logging.basicConfig(level=logging.DEBUG, format='%(message)s', force=True)
- else:
- logging.basicConfig(level=logging.INFO, format='%(message)s', force=True)
- def cancel(self):
- """取消搜索任务"""
- self._cancelled = True
- self.logger.info("搜索任务已取消")
- def crawl(self, domain: str, index:int) -> None:
- '''
- 检测域名是否可用
- :params domain 域名:
- :return true or false'''
- # 检查是否已取消
- if self._cancelled:
- return
-
- res = False
- try:
- whi = whois.whois(domain)
- res = False
- self.logger.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
- except Exception as e:
- error_str = str(e)
- # 检查是否是域名未注册的错误
- if "No match" in error_str or "No match for" in error_str:
- res = True
- self.logger.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
- else:
- res = False
- self.logger.error(f"Error checking {domain}: {error_str}")
-
- # 再次检查是否已取消
- if self._cancelled:
- return
-
- if self.export_all:
- self.saveRes(domain, res)
- else:
- if res:
- self.saveRes(domain, res)
- def saveRes(self, domain: str, res: bool):
- """ save result to file """
- # db.Mysql().save()
- output_path = os.path.join(self.params["app_path"], self.output)
- # 检查输出文件是否存在,不存在则创建并警告(只检查一次)
- if not self._output_file_checked:
- if not os.path.exists(output_path):
- # 确保目录存在
- dir_path = os.path.dirname(output_path)
- if dir_path and not os.path.exists(dir_path):
- os.makedirs(dir_path, exist_ok=True)
- # 创建文件
- with open(output_path, 'w', encoding='utf-8') as f:
- pass
- self.logger.warning(f"警告:输出文件不存在,已创建: {output_path}")
- self._output_file_checked = True
- db.File().save(output_path, domain + " " + str(res))
- def run(self):
- '''begin search domain'''
- # 先创建输出文件(如果不存在),在保存结果前创建
- output_path = os.path.join(self.params["app_path"], self.output)
- if not os.path.exists(output_path):
- # 确保目录存在
- dir_path = os.path.dirname(output_path)
- if dir_path and not os.path.exists(dir_path):
- os.makedirs(dir_path, exist_ok=True)
- # 创建文件
- with open(output_path, 'w', encoding='utf-8') as f:
- pass
- self.logger.warning(f"警告:输出文件不存在,已创建: {output_path}")
-
- # 支持 input 为完整路径或相对路径
- input_path = self.input if os.path.isabs(self.input) else os.path.join(self.params["app_path"], self.input)
- with open(input_path, "r", encoding="utf8", errors="ignore") as file:
- with ThreadPoolExecutor(max_workers=5) as pool:
- index = 0
- futures = []
- for line in file.readlines():
- # 检查是否已取消
- if self._cancelled:
- break
- domain = line.strip()
- if domain: # 跳过空行
- index = index + 1
- future = pool.submit(self.crawl, domain, index)
- futures.append(future)
- # 等待所有任务完成,但如果已取消则取消未完成的任务
- if self._cancelled:
- # 取消所有未完成的任务
- for future in futures:
- future.cancel()
- self.logger.info("已取消所有未完成的任务")
- else:
- # 等待所有任务完成
- for future in futures:
- try:
- future.result()
- except Exception as e:
- if not self._cancelled:
- self.logger.error(f"任务执行出错: {str(e)}")
- if __name__ == '__main__':
- sd = SearchDomain()
- sd.run()
|