#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Contact : liuyuqi.gov@msn.cn @Time : 2022/11/27 13:17:27 @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved. @Desc : ''' from importlib.resources import path import random, string, io, os, shutil, platform, subprocess, sys, zipfile, time from selenium import webdriver from selenium.webdriver.chrome.options import Options import undetected_chromedriver.v2 as uc from webdriver_manager.chrome import ChromeDriverManager class Chrome(): driverPath = None user_dir = None def __init__(self): os.makedirs("data") if not os.path.exists("data") else False os.makedirs("data/driver") if not os.path.exists("data/driver") else False os.makedirs("data/browser-profiles") if not os.path.exists("data/browser-profiles") else False download_driver() self.driverPath = ChromeDriverManager(path="data/driver").install() # if sys.platform == "win32": # # shutil.move("chromedriver.exe", "data/driver") # cd = os.path.abspath("data/driver/chromedriver.exe") # else: # # shutil.move("chromedriver", "data/driver") # time.sleep(2.5) # #cd = os.path.abspath(path) # Patcher(executable_path=cd).patch_exe() @staticmethod def gen_random_cdc(): '''随机生成字符串''' cdc = random.choices(string.ascii_lowercase, k=26) cdc[-6:-4] = map(str.upper, cdc[-6:-4]) cdc[2] = cdc[0] cdc[3] = "_" return "".join(cdc).encode() def monkey_patch_exe(self): '''添加补丁''' linect = 0 replacement = self.gen_random_cdc() replacement = f" var key = '${replacement.decode()}_';\n".encode() with io.open(self.driverPath, "r+b") as fh: for line in iter(lambda: fh.readline(), b""): if b"var key = " in line: fh.seek(-len(line), 1) fh.write(replacement) linect += 1 return linect def getWebdriver(self, i=None, proxy=False, headless=False, browser_profile=None, proxy_address=None): ''' get webdriver :param i: i :param proxy: 代理 :param headless: 是否无头模式 :param browser_profile: i :param proxy_address: i :returns: webdriver ''' options = self.options(i=i, proxy=proxy, headless=headless, browser_profile=browser_profile, proxy_address=proxy_address) return webdriver.Chrome(executable_path=self.driverPath, options=options) def setup_useragent(driver :webdriver.Chrome): # driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": f"{random_user_agent}"}) pass def Proxy(PROXY_HOST, PROXY_PORT, PROXY_USER, PROXY_PASS, i): try: manifest_json = """ { "manifest_version": 2, "name": "Proxy Manager", "version": "3.0.11", "permissions": [ "proxy", "tabs", "unlimitedStorage", "storage", "", "webRequest", "webRequestBlocking" ], "background": { "scripts": ["background.js"] }, "minimum_chrome_version":"22.0.0" } """ background_js = string.Template( """ var config = { mode: "fixed_servers", rules: { singleProxy: { scheme: "http", host: "${PROXY_HOST}", port: parseInt(${PROXY_PORT}) }, bypassList: ["foobar.com"] } }; chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); function callbackFn(details) { return { authCredentials: { username: "${PROXY_USER}", password: "${PROXY_PASS}" } }; } chrome.webRequest.onAuthRequired.addListener( callbackFn, {urls: [""]}, ['blocking'] ); """ ).substitute( PROXY_HOST=PROXY_HOST, PROXY_PORT=PROXY_PORT, PROXY_USER=PROXY_USER, PROXY_PASS=PROXY_PASS) if not os.path.exists("data/extension"): os.makedirs("data/extension") with zipfile.ZipFile(f'data/extension/proxy_auth_plugin_{i}.zip', 'w', zipfile.ZIP_DEFLATED, False) as zp: zp.writestr('manifest.json', manifest_json) zp.writestr('background.js', background_js) return f"data/extension/proxy_auth_plugin_{i}.zip" except Exception as e: return False now = datetime.now().strftime('%H:%M:%S') print(f'[{now}] - {e}') class bcolors: HEADER = '\033[95m' OKBLUE = '\033[94m' OKCYAN = '\033[96m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' CHROME = ['{8A69D345-D564-463c-AFF1-A69D9E530F96}', '{8237E44A-0054-442C-B6B6-EA0509993955}', '{401C381F-E0DE-4B85-8BD8-3F3F14FBDA57}', '{4ea16ac7-fd5a-47c3-875b-dbf4a2008c20}'] def download_driver(): '''根据系统安装的chrome版本,下载指定版本的 chrome-driver 驱动''' OSNAME = platform.system() print(bcolors.WARNING + 'Getting Chrome Driver...' + bcolors.ENDC) if OSNAME == 'Linux': OSNAME = 'lin' EXE_NAME = "" with subprocess.Popen(['google-chrome', '--version'], stdout=subprocess.PIPE) as proc: version = proc.stdout.read().decode('utf-8').replace('Google Chrome', '').strip() elif OSNAME == 'Darwin': OSNAME = 'mac' EXE_NAME = "" process = subprocess.Popen(['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '--version'], stdout=subprocess.PIPE) version = process.communicate()[0].decode('UTF-8').replace('Google Chrome', '').strip() elif OSNAME == 'Windows': OSNAME = 'win' EXE_NAME = ".exe" version = None try: process = subprocess.Popen(['reg', 'query', 'HKEY_CURRENT_USER\\Software\\Google\\Chrome\\BLBeacon', '/v', 'version'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL) version = process.communicate()[0].decode( 'UTF-8').strip().split()[-1] except: for i in CHROME: for j in ['opv', 'pv']: try: command = ['reg', 'query', f'HKEY_LOCAL_MACHINE\\Software\\Google\\Update\\Clients\\{i}', '/v', f'{j}', '/reg:32'] process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL) version = process.communicate()[0].decode('UTF-8').strip().split()[-1] except: pass if not version: print(bcolors.WARNING + "Couldn't find your Google Chrome version automatically!" + bcolors.ENDC) version = input(bcolors.WARNING + 'Please input your google chrome version (ex: 91.0.4472.114) : ' + bcolors.ENDC) else: print('{} OS is not supported.'.format(OSNAME)) sys.exit() # uc.install()