import whois
from concurrent.futures import ThreadPoolExecutor
import logging
import argparse
from . import db
class SearchDomain(object):
"""search avaliable domain and save result"""
def __init__(self, debug=False, export_all=False):
'''
初始化
debug 调试模式
export_all 是否导出所有域名,默认导出可用域名
return:
'''
super(SearchDomain, self).__init__()
self.export_all=export_all
parser = argparse.ArgumentParser(description='Demo of argparse')
parser.add_argument(
"--input", help="set input domain list file,eg: domain.txt", type=str, default="domain.txt")
parser.add_argument(
"--output", help="set output domain result list file,eg: result.txt", type=str, default="result.txt")
args = parser.parse_args()
if args.input:
self.input = args.input
if args.output:
self.output = args.output
if debug == True:
logging.basicConfig(level=logging.DEBUG)
def crawl(self, domain: str, index:int) -> None:
'''
检测域名是否可用
:params domain 域名:
:return true or false'''
res = False
try:
whi = whois.whois(domain)
res = False
logging.info(str(index) + ": searching domain:"+ domain + " is unavaliable.")
except Exception as e:
if(str(e).index("No match") == 0):
res = True
logging.info(str(index) + ": searching domain:"+ domain +" is avaliable.")
else:
res = False
logging.error(e)
if self.export_all:
self.saveRes(domain, res)
else:
if res:
self.saveRes(domain, res)
def saveRes(self, domain: str, res: bool):
""" save result to file """
# db.Mysql().save()
db.File().save(self.output, domain + " " + str(res))
def run(self):
'''begin search domain'''
with open(self.input, "r", encoding="utf8", errors="ignore") as file:
pool = ThreadPoolExecutor(max_workers=5)
index = 0
for line in file.readlines():
index = index + 1
pool.submit(self.crawl, line.strip(), index)
if __name__ == '__main__':
sd = SearchDomain()
sd.run()