123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @Contact : liuyuqi.gov@msn.cn
- @Time : 2022/11/27 13:17:27
- @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved.
- @Desc :
- '''
- from importlib.resources import path
- import random, string, io, os, shutil, platform, subprocess, sys, zipfile, time
- from selenium import webdriver
- from selenium.webdriver.chrome.options import Options
- import undetected_chromedriver.v2 as uc
- from webdriver_manager.chrome import ChromeDriverManager
- class Chrome():
- driverPath = None
- user_dir = None
- def __init__(self):
- os.makedirs("data") if not os.path.exists("data") else False
- os.makedirs("data/driver") if not os.path.exists("data/driver") else False
- os.makedirs("data/browser-profiles") if not os.path.exists("data/browser-profiles") else False
- download_driver()
- self.driverPath = ChromeDriverManager(path="data/driver").install()
- # if sys.platform == "win32":
- # # shutil.move("chromedriver.exe", "data/driver")
- # cd = os.path.abspath("data/driver/chromedriver.exe")
- # else:
- # # shutil.move("chromedriver", "data/driver")
- # time.sleep(2.5)
- # #cd = os.path.abspath(path)
- # Patcher(executable_path=cd).patch_exe()
-
- @staticmethod
- def gen_random_cdc():
- '''随机生成字符串'''
- cdc = random.choices(string.ascii_lowercase, k=26)
- cdc[-6:-4] = map(str.upper, cdc[-6:-4])
- cdc[2] = cdc[0]
- cdc[3] = "_"
- return "".join(cdc).encode()
- def monkey_patch_exe(self):
- '''添加补丁'''
- linect = 0
- replacement = self.gen_random_cdc()
- replacement = f" var key = '${replacement.decode()}_';\n".encode()
- with io.open(self.driverPath, "r+b") as fh:
- for line in iter(lambda: fh.readline(), b""):
- if b"var key = " in line:
- fh.seek(-len(line), 1)
- fh.write(replacement)
- linect += 1
- return linect
-
- def getWebdriver(self, i=None, proxy=False, headless=False, browser_profile=None, proxy_address=None):
- '''
- get webdriver
-
- :param i: i
- :param proxy: 代理
- :param headless: 是否无头模式
- :param browser_profile: i
- :param proxy_address: i
- :returns: webdriver
- '''
- options = self.options(i=i, proxy=proxy, headless=headless, browser_profile=browser_profile, proxy_address=proxy_address)
- return webdriver.Chrome(executable_path=self.driverPath, options=options)
- def setup_useragent(driver :webdriver.Chrome):
- # driver.execute_cdp_cmd("Network.setUserAgentOverride", {"userAgent": f"{random_user_agent}"})
- pass
- def Proxy(PROXY_HOST, PROXY_PORT, PROXY_USER, PROXY_PASS, i):
- try:
- manifest_json = """
- {
- "manifest_version": 2,
- "name": "Proxy Manager",
- "version": "3.0.11",
- "permissions": [
- "proxy",
- "tabs",
- "unlimitedStorage",
- "storage",
- "<all_urls>",
- "webRequest",
- "webRequestBlocking"
- ],
- "background": {
- "scripts": ["background.js"]
- },
- "minimum_chrome_version":"22.0.0"
- }
- """
- background_js = string.Template(
- """
- var config = {
- mode: "fixed_servers",
- rules: {
- singleProxy: {
- scheme: "http",
- host: "${PROXY_HOST}",
- port: parseInt(${PROXY_PORT})
- },
- bypassList: ["foobar.com"]
- }
- };
- chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
- function callbackFn(details) {
- return {
- authCredentials: {
- username: "${PROXY_USER}",
- password: "${PROXY_PASS}"
- }
- };
- }
- chrome.webRequest.onAuthRequired.addListener(
- callbackFn,
- {urls: ["<all_urls>"]},
- ['blocking']
- );
- """
- ).substitute(
- PROXY_HOST=PROXY_HOST,
- PROXY_PORT=PROXY_PORT,
- PROXY_USER=PROXY_USER,
- PROXY_PASS=PROXY_PASS)
-
- if not os.path.exists("data/extension"):
- os.makedirs("data/extension")
- with zipfile.ZipFile(f'data/extension/proxy_auth_plugin_{i}.zip', 'w', zipfile.ZIP_DEFLATED, False) as zp:
- zp.writestr('manifest.json', manifest_json)
- zp.writestr('background.js', background_js)
- return f"data/extension/proxy_auth_plugin_{i}.zip"
- except Exception as e:
- return False
- now = datetime.now().strftime('%H:%M:%S')
- print(f'[{now}] - {e}')
- class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKCYAN = '\033[96m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
- CHROME = ['{8A69D345-D564-463c-AFF1-A69D9E530F96}',
- '{8237E44A-0054-442C-B6B6-EA0509993955}',
- '{401C381F-E0DE-4B85-8BD8-3F3F14FBDA57}',
- '{4ea16ac7-fd5a-47c3-875b-dbf4a2008c20}']
- def download_driver():
- '''根据系统安装的chrome版本,下载指定版本的 chrome-driver 驱动'''
- OSNAME = platform.system()
- print(bcolors.WARNING + 'Getting Chrome Driver...' + bcolors.ENDC)
- if OSNAME == 'Linux':
- OSNAME = 'lin'
- EXE_NAME = ""
- with subprocess.Popen(['google-chrome', '--version'], stdout=subprocess.PIPE) as proc:
- version = proc.stdout.read().decode('utf-8').replace('Google Chrome', '').strip()
- elif OSNAME == 'Darwin':
- OSNAME = 'mac'
- EXE_NAME = ""
- process = subprocess.Popen(['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '--version'], stdout=subprocess.PIPE)
- version = process.communicate()[0].decode('UTF-8').replace('Google Chrome', '').strip()
- elif OSNAME == 'Windows':
- OSNAME = 'win'
- EXE_NAME = ".exe"
- version = None
- try:
- process = subprocess.Popen(['reg', 'query', 'HKEY_CURRENT_USER\\Software\\Google\\Chrome\\BLBeacon', '/v', 'version'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL)
- version = process.communicate()[0].decode(
- 'UTF-8').strip().split()[-1]
- except:
- for i in CHROME:
- for j in ['opv', 'pv']:
- try:
- command = ['reg', 'query', f'HKEY_LOCAL_MACHINE\\Software\\Google\\Update\\Clients\\{i}', '/v', f'{j}', '/reg:32']
- process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL)
- version = process.communicate()[0].decode('UTF-8').strip().split()[-1]
- except:
- pass
- if not version:
- print(bcolors.WARNING + "Couldn't find your Google Chrome version automatically!" + bcolors.ENDC)
- version = input(bcolors.WARNING + 'Please input your google chrome version (ex: 91.0.4472.114) : ' + bcolors.ENDC)
- else:
- print('{} OS is not supported.'.format(OSNAME))
- sys.exit()
- # uc.install()
|