| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import whois
- from concurrent.futures import ThreadPoolExecutor
- import logging,os
- import argparse
- from . import db
- class SearchDomain(object):
- """search avaliable domain and save result"""
- def __init__(self, params: dict, debug=False, export_all=True, log_callback=None):
- '''
- 初始化
- debug 调试模式
- export_all 是否导出所有域名,默认导出可用域名
- log_callback 日志回调函数,用于GUI模式下显示日志
- return:
- '''
- super(SearchDomain, self).__init__()
- self.params = params
- self.export_all=export_all
- self.input=params["input"]
- self.output=params["output"]
- self.log_callback = log_callback
- self._output_file_checked = False # 标记输出文件是否已检查
-
- # 获取当前模块的日志记录器
- self.logger = logging.getLogger(__name__)
-
- # 配置日志系统
- if log_callback:
- # 如果有日志回调函数,创建自定义处理器
- # 创建自定义处理器,将日志输出到回调函数
- class CallbackHandler(logging.Handler):
- def __init__(self, callback):
- super().__init__()
- self.callback = callback
-
- def emit(self, record):
- msg = self.format(record)
- if self.callback:
- self.callback(msg)
-
- self.logger.setLevel(logging.INFO)
-
- # 检查是否已经添加了回调处理器,避免重复添加
- has_callback_handler = any(isinstance(h, CallbackHandler) for h in self.logger.handlers)
- if not has_callback_handler:
- callback_handler = CallbackHandler(log_callback)
- callback_handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(message)s')
- callback_handler.setFormatter(formatter)
- self.logger.addHandler(callback_handler)
- elif debug == True:
- logging.basicConfig(level=logging.DEBUG, format='%(message)s', force=True)
- else:
- logging.basicConfig(level=logging.INFO, format='%(message)s', force=True)
- def crawl(self, domain: str, index:int) -> None:
- '''
- 检测域名是否可用
- :params domain 域名:
- :return true or false'''
- res = False
- try:
- whi = whois.whois(domain)
- res = False
- self.logger.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
- except Exception as e:
- error_str = str(e)
- # 检查是否是域名未注册的错误
- if "No match" in error_str or "No match for" in error_str:
- res = True
- self.logger.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
- else:
- res = False
- self.logger.error(f"Error checking {domain}: {error_str}")
- if self.export_all:
- self.saveRes(domain, res)
- else:
- if res:
- self.saveRes(domain, res)
- def saveRes(self, domain: str, res: bool):
- """ save result to file """
- # db.Mysql().save()
- output_path = os.path.join(self.params["app_path"], self.output)
- # 检查输出文件是否存在,不存在则创建并警告(只检查一次)
- if not self._output_file_checked:
- if not os.path.exists(output_path):
- # 确保目录存在
- dir_path = os.path.dirname(output_path)
- if dir_path and not os.path.exists(dir_path):
- os.makedirs(dir_path, exist_ok=True)
- # 创建文件
- with open(output_path, 'w', encoding='utf-8') as f:
- pass
- self.logger.warning(f"警告:输出文件不存在,已创建: {output_path}")
- self._output_file_checked = True
- db.File().save(output_path, domain + " " + str(res))
- def run(self):
- '''begin search domain'''
- # 支持 input 为完整路径或相对路径
- input_path = self.input if os.path.isabs(self.input) else os.path.join(self.params["app_path"], self.input)
- with open(input_path, "r", encoding="utf8", errors="ignore") as file:
- with ThreadPoolExecutor(max_workers=5) as pool:
- index = 0
- futures = []
- for line in file.readlines():
- domain = line.strip()
- if domain: # 跳过空行
- index = index + 1
- future = pool.submit(self.crawl, domain, index)
- futures.append(future)
- # 等待所有任务完成
- for future in futures:
- future.result()
- if __name__ == '__main__':
- sd = SearchDomain()
- sd.run()
|