|
|
@@ -0,0 +1,338 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- encoding: utf-8 -*-
|
|
|
+'''
|
|
|
+@Contact : liuyuqi.gov@msn.cn
|
|
|
+@Time : 2025/09/30 22:31:02
|
|
|
+@License : Copyright © 2017-2022 liuyuqi. All Rights Reserved.
|
|
|
+@Desc : alidriver_checkin crawler class
|
|
|
+'''
|
|
|
+
|
|
|
+import requests
|
|
|
+import pickle
|
|
|
+import os
|
|
|
+
|
|
|
+
|
|
|
+# 数据目录
|
|
|
+DATA_DIR = 'data'
|
|
|
+
|
|
|
+# 确保data目录存在
|
|
|
+if not os.path.exists(DATA_DIR):
|
|
|
+ os.makedirs(DATA_DIR)
|
|
|
+
|
|
|
+
|
|
|
+class AliDriverCheckin:
|
|
|
+ _host = 'https://www.aliyundrive.com'
|
|
|
+
|
|
|
+ headers = {
|
|
|
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
|
|
|
+ 'Accept': 'application/json',
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
+ 'Origin': 'https://lycheeshare.com',
|
|
|
+ 'Referer': 'https://lycheeshare.com/profile'
|
|
|
+ }
|
|
|
+
|
|
|
+ def generate_account(self, number:int=20):
|
|
|
+ """Generate random account credentials.
|
|
|
+ :param int number: Number of accounts to generate.
|
|
|
+ :return: A tuple of (username, password, email).
|
|
|
+ """
|
|
|
+ import random
|
|
|
+ import string
|
|
|
+
|
|
|
+ # 常用单词和拼音列表
|
|
|
+ words = ['sky', 'sun', 'moon', 'star', 'cloud', 'rain', 'wind', 'fire', 'water', 'earth',
|
|
|
+ 'cat', 'dog', 'bird', 'fish', 'tiger', 'lion', 'bear', 'wolf', 'fox', 'deer',
|
|
|
+ 'blue', 'red', 'green', 'black', 'white', 'gold', 'silver', 'purple', 'orange',
|
|
|
+ 'happy', 'lucky', 'smart', 'cool', 'fast', 'strong', 'brave', 'super', 'magic',
|
|
|
+ 'wang', 'li', 'zhang', 'liu', 'chen', 'yang', 'huang', 'zhao', 'wu', 'zhou',
|
|
|
+ 'xiao', 'ming', 'hong', 'wei', 'jie', 'lei', 'tao', 'jun', 'feng', 'long']
|
|
|
+
|
|
|
+ if number < 1:
|
|
|
+ number = 1
|
|
|
+ if number > 100:
|
|
|
+ number = 100
|
|
|
+ accounts = []
|
|
|
+ for _ in range(number):
|
|
|
+ # 组合单词/拼音和数字
|
|
|
+ # word1 = random.choice(words)
|
|
|
+ # word2 = random.choice(words)
|
|
|
+ num = random.randint(100000000, 999999999)
|
|
|
+ username = f"1{num}"
|
|
|
+
|
|
|
+ # 密码:字母+数字组合,8-12位
|
|
|
+ password_len = random.randint(8, 12)
|
|
|
+ password = ''.join(random.choices(string.ascii_letters + string.digits, k=password_len))
|
|
|
+
|
|
|
+ email = f"{username}@qq.com"
|
|
|
+ accounts.append((username, email, password))
|
|
|
+
|
|
|
+ with open(os.path.join(DATA_DIR, 'generate_account.txt'), 'w') as f:
|
|
|
+ for account in accounts:
|
|
|
+ f.write(','.join(account) + '\n')
|
|
|
+
|
|
|
+ return accounts
|
|
|
+
|
|
|
+ def __init__(self, email, password):
|
|
|
+ self.email = email
|
|
|
+ self.password = password
|
|
|
+ self.session = requests.Session()
|
|
|
+ self.session.headers.update(self.headers)
|
|
|
+ self.token = None
|
|
|
+ self.cookie_file = os.path.join(DATA_DIR, "cookie" ,f'cookies_{email.replace("@", "_at_").replace(".", "_")}.pkl')
|
|
|
+ self.x_client_id = self._get_x_client_id() or 'mgn9qt0o-3d5ecd48cff583a6'
|
|
|
+
|
|
|
+ def get_captcha(self):
|
|
|
+ """Send verification code to email."""
|
|
|
+ return True, "000000"
|
|
|
+
|
|
|
+ def register(self, username, code):
|
|
|
+ """Register a new account with verification code."""
|
|
|
+ url = f'{self._host}/api/auth/register'
|
|
|
+ data = {
|
|
|
+ 'email': self.email,
|
|
|
+ 'password': self.password,
|
|
|
+ 'username': username,
|
|
|
+ 'code': code
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ response = self.session.post(url, json=data)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ print(f"注册成功: {username}")
|
|
|
+ return True, result
|
|
|
+ else:
|
|
|
+ print(f"注册失败: {result}")
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"注册异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+ def login(self):
|
|
|
+ """Login to account and save auth token."""
|
|
|
+ url = f'{self._host}/api/auth/login'
|
|
|
+ data = {
|
|
|
+ 'email': self.email,
|
|
|
+ 'password': self.password
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ response = self.session.post(url, json=data)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ # Token在 data.token 中
|
|
|
+ if 'data' in result and 'token' in result['data']:
|
|
|
+ self.token = result['data']['token']
|
|
|
+ # 设置 auth-token cookie
|
|
|
+ self.session.cookies.set('auth-token', self.token, domain='lycheeshare.com', path='/')
|
|
|
+ self.session.headers.update({'Authorization': f'Bearer {self.token}'})
|
|
|
+ # Save cookies to file
|
|
|
+ self.save_cookies()
|
|
|
+ print(f"登录成功: {self.email}")
|
|
|
+ return True, result
|
|
|
+ print(f"登录失败: {result}")
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"登录异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+ def _get_x_client_id(self):
|
|
|
+ """Fetch x-client-id from the homepage.
|
|
|
+ 访问:https://lycheeshare.com ,在源码中查找类似如下内容:
|
|
|
+ window.__CLIENT_ID__ = 'mgn9qt0o-3d5ecd48cff583a6';
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ response = self.session.get(self._host)
|
|
|
+ if response.status_code == 200:
|
|
|
+ import re
|
|
|
+ match = re.search(r"window\.__CLIENT_ID__\s*=\s*'([a-z0-9\-]+)'", response.text)
|
|
|
+ if match:
|
|
|
+ print(f"获取 x-client-id: {match.group(1)}")
|
|
|
+ return match.group(1)
|
|
|
+ print("无法获取 x-client-id")
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取 x-client-id 异常: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def checkin(self):
|
|
|
+ """Daily check-in to earn rewards."""
|
|
|
+ url = f'{self._host}/api/user/checkin'
|
|
|
+ try:
|
|
|
+ self.session.headers.update({'x-client-id': self.x_client_id})
|
|
|
+ response = self.session.post(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ print(f"签到成功: {self.email} - {result.get('message', '')}")
|
|
|
+ return True, result
|
|
|
+ else:
|
|
|
+ print(f"签到失败: {self.email} - {result}")
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"签到异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+ def reset_token(self):
|
|
|
+ """Reset API token if get token fails."""
|
|
|
+ url = f'{self._host}/api/user/reset-api-token'
|
|
|
+ try:
|
|
|
+ response = self.session.post(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ print(f"重置Token成功: {self.email}")
|
|
|
+ return True, result
|
|
|
+ else:
|
|
|
+ print(f"重置Token失败: {result}")
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"重置Token异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+ def get_token(self):
|
|
|
+ """Get API token for the account. If fails, reset and try again."""
|
|
|
+ url = f'{self._host}/api/user/get-api-token'
|
|
|
+ try:
|
|
|
+ response = self.session.get(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ api_token = None
|
|
|
+ if 'data' in result:
|
|
|
+ api_token = result['data'].get('token') or result['data'].get('api_token')
|
|
|
+
|
|
|
+ if api_token:
|
|
|
+ print(f"获取Token成功: {self.email}")
|
|
|
+ print(f"API Token: {api_token}")
|
|
|
+ # Save to file
|
|
|
+ with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
|
|
|
+ f.write(f"{self.email},{api_token}\n")
|
|
|
+ return True, result
|
|
|
+ else:
|
|
|
+ # Token为空,尝试重置
|
|
|
+ print(f"Token为空,尝试重置Token: {self.email}")
|
|
|
+ success, reset_result = self.reset_token()
|
|
|
+ if success:
|
|
|
+ # 重置成功后再次获取
|
|
|
+ import time
|
|
|
+ time.sleep(1)
|
|
|
+ response = self.session.get(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ if 'data' in result:
|
|
|
+ api_token = result['data'].get('token') or result['data'].get('api_token')
|
|
|
+ if api_token:
|
|
|
+ print(f"重置后获取Token成功: {self.email}")
|
|
|
+ print(f"API Token: {api_token}")
|
|
|
+ with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
|
|
|
+ f.write(f"{self.email},{api_token}\n")
|
|
|
+ return True, result
|
|
|
+ return False, result
|
|
|
+ else:
|
|
|
+ # 如果获取失败,尝试重置token
|
|
|
+ print(f"获取Token失败,尝试重置Token: {self.email}")
|
|
|
+ success, reset_result = self.reset_token()
|
|
|
+ if success:
|
|
|
+ # 重置成功后再次获取
|
|
|
+ import time
|
|
|
+ time.sleep(1)
|
|
|
+ response = self.session.get(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ if 'data' in result:
|
|
|
+ api_token = result['data'].get('token') or result['data'].get('api_token')
|
|
|
+ if api_token:
|
|
|
+ print(f"重置后获取Token成功: {self.email}")
|
|
|
+ print(f"API Token: {api_token}")
|
|
|
+ with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
|
|
|
+ f.write(f"{self.email},{api_token}\n")
|
|
|
+ return True, result
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"获取Token异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+ def get_credit(self):
|
|
|
+ """查询账户额度信息"""
|
|
|
+ # 确保有有效会话
|
|
|
+ try:
|
|
|
+ self.session.get(f'{self._host}/profile')
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+
|
|
|
+ url = f'{self._host}/api/user/credit-bundles'
|
|
|
+ try:
|
|
|
+ response = self.session.get(url)
|
|
|
+ result = response.json()
|
|
|
+ if response.status_code == 200 and result.get('code') == 0:
|
|
|
+ if 'data' in result:
|
|
|
+ credit_data = result['data']
|
|
|
+ print(credit_data)
|
|
|
+ total_credit = credit_data["summary"].get('total_credit', 0)
|
|
|
+ used_credit = credit_data["summary"].get('used_credit', 0)
|
|
|
+ available_credit = credit_data["summary"].get('available_credit', 0)
|
|
|
+
|
|
|
+ print(f"额度查询成功: {self.email}")
|
|
|
+ print(f" 总额度: {total_credit}")
|
|
|
+ print(f" 已使用: {used_credit}")
|
|
|
+ print(f" 可用额度: {available_credit}")
|
|
|
+
|
|
|
+ # 保存到文件
|
|
|
+ with open(os.path.join(DATA_DIR, 'credits.txt'), 'a') as f:
|
|
|
+ f.write(f"{self.email},{total_credit},{used_credit},{available_credit}\n")
|
|
|
+
|
|
|
+ return True, result
|
|
|
+ else:
|
|
|
+ print(f"额度数据为空: {self.email}")
|
|
|
+ return False, result
|
|
|
+ else:
|
|
|
+ print(f"查询额度失败: {self.email} - {result}")
|
|
|
+ return False, result
|
|
|
+ except Exception as e:
|
|
|
+ print(f"查询额度异常: {e}")
|
|
|
+ return False, str(e)
|
|
|
+
|
|
|
+
|
|
|
+ def save_cookies(self):
|
|
|
+ """Save session cookies to file."""
|
|
|
+ try:
|
|
|
+ # 确保cookie目录存在
|
|
|
+ cookie_dir = os.path.dirname(self.cookie_file)
|
|
|
+ if not os.path.exists(cookie_dir):
|
|
|
+ os.makedirs(cookie_dir)
|
|
|
+
|
|
|
+ with open(self.cookie_file, 'wb') as f:
|
|
|
+ pickle.dump({
|
|
|
+ 'cookies': self.session.cookies,
|
|
|
+ 'token': self.token
|
|
|
+ }, f)
|
|
|
+ print(f"Cookies已保存到 {self.cookie_file}")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"保存Cookies失败: {e}")
|
|
|
+
|
|
|
+ def load_cookies(self):
|
|
|
+ """Load session cookies from file and validate."""
|
|
|
+ if os.path.exists(self.cookie_file):
|
|
|
+ try:
|
|
|
+ with open(self.cookie_file, 'rb') as f:
|
|
|
+ data = pickle.load(f)
|
|
|
+ self.session.cookies.update(data['cookies'])
|
|
|
+ self.token = data.get('token')
|
|
|
+ if self.token:
|
|
|
+ self.session.headers.update({'Authorization': f'Bearer {self.token}'})
|
|
|
+ print(f"Cookies已加载: {self.cookie_file}")
|
|
|
+
|
|
|
+ # 验证cookie有效性
|
|
|
+ print("验证Cookie有效性...")
|
|
|
+ url = f'{self._host}/api/user/credit-bundles'
|
|
|
+ try:
|
|
|
+ response = self.session.get(url)
|
|
|
+ if response.status_code == 200 and response.json().get('code') == 0:
|
|
|
+ print("Cookie有效")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ print(f"Cookie已失效 (状态码: {response.status_code}),需要重新登录")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"验证Cookie时出错: {e},需要重新登录")
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ print(f"加载Cookies失败: {e}")
|
|
|
+ return False
|
|
|
+ return False
|