searchdomain.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. import whois
  2. from concurrent.futures import ThreadPoolExecutor
  3. import logging,os
  4. import argparse
  5. from . import db
  6. class SearchDomain(object):
  7. """search avaliable domain and save result"""
  8. def __init__(self, params: dict, debug=False, export_all=True):
  9. '''
  10. 初始化
  11. debug 调试模式
  12. export_all 是否导出所有域名,默认导出可用域名
  13. return:
  14. '''
  15. super(SearchDomain, self).__init__()
  16. self.params = params
  17. self.export_all=export_all
  18. self.input=params["input"]
  19. self.output=params["output"]
  20. if debug == True:
  21. logging.basicConfig(level=logging.DEBUG)
  22. def crawl(self, domain: str, index:int) -> None:
  23. '''
  24. 检测域名是否可用
  25. :params domain 域名:
  26. :return true or false'''
  27. res = False
  28. try:
  29. whi = whois.whois(domain)
  30. res = False
  31. logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
  32. except Exception as e:
  33. error_str = str(e)
  34. # 检查是否是域名未注册的错误
  35. if "No match" in error_str or "No match for" in error_str:
  36. res = True
  37. logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
  38. else:
  39. res = False
  40. logging.error(f"Error checking {domain}: {error_str}")
  41. if self.export_all:
  42. self.saveRes(domain, res)
  43. else:
  44. if res:
  45. self.saveRes(domain, res)
  46. def saveRes(self, domain: str, res: bool):
  47. """ save result to file """
  48. # db.Mysql().save()
  49. output_path = os.path.join(self.params["app_path"], self.output)
  50. db.File().save(output_path, domain + " " + str(res))
  51. def run(self):
  52. '''begin search domain'''
  53. # 支持 input 为完整路径或相对路径
  54. input_path = self.input if os.path.isabs(self.input) else os.path.join(self.params["app_path"], self.input)
  55. with open(input_path, "r", encoding="utf8", errors="ignore") as file:
  56. with ThreadPoolExecutor(max_workers=5) as pool:
  57. index = 0
  58. futures = []
  59. for line in file.readlines():
  60. domain = line.strip()
  61. if domain: # 跳过空行
  62. index = index + 1
  63. future = pool.submit(self.crawl, domain, index)
  64. futures.append(future)
  65. # 等待所有任务完成
  66. for future in futures:
  67. future.result()
  68. if __name__ == '__main__':
  69. sd = SearchDomain()
  70. sd.run()