#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Contact : liuyuqi.gov@msn.cn @Time : 2025/09/30 22:31:02 @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved. @Desc : alidriver_checkin crawler class ''' import requests import pickle import os # 数据目录 DATA_DIR = 'data' # 确保data目录存在 if not os.path.exists(DATA_DIR): os.makedirs(DATA_DIR) class AliDriverCheckin: _host = 'https://www.aliyundrive.com' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36', 'Accept': 'application/json', 'Content-Type': 'application/json', 'Origin': 'https://lycheeshare.com', 'Referer': 'https://lycheeshare.com/profile' } def generate_account(self, number:int=20): """Generate random account credentials. :param int number: Number of accounts to generate. :return: A tuple of (username, password, email). """ import random import string # 常用单词和拼音列表 words = ['sky', 'sun', 'moon', 'star', 'cloud', 'rain', 'wind', 'fire', 'water', 'earth', 'cat', 'dog', 'bird', 'fish', 'tiger', 'lion', 'bear', 'wolf', 'fox', 'deer', 'blue', 'red', 'green', 'black', 'white', 'gold', 'silver', 'purple', 'orange', 'happy', 'lucky', 'smart', 'cool', 'fast', 'strong', 'brave', 'super', 'magic', 'wang', 'li', 'zhang', 'liu', 'chen', 'yang', 'huang', 'zhao', 'wu', 'zhou', 'xiao', 'ming', 'hong', 'wei', 'jie', 'lei', 'tao', 'jun', 'feng', 'long'] if number < 1: number = 1 if number > 100: number = 100 accounts = [] for _ in range(number): # 组合单词/拼音和数字 # word1 = random.choice(words) # word2 = random.choice(words) num = random.randint(100000000, 999999999) username = f"1{num}" # 密码:字母+数字组合,8-12位 password_len = random.randint(8, 12) password = ''.join(random.choices(string.ascii_letters + string.digits, k=password_len)) email = f"{username}@qq.com" accounts.append((username, email, password)) with open(os.path.join(DATA_DIR, 'generate_account.txt'), 'w') as f: for account in accounts: f.write(','.join(account) + '\n') return accounts def __init__(self, email, password): self.email = email self.password = password self.session = requests.Session() self.session.headers.update(self.headers) self.token = None self.cookie_file = os.path.join(DATA_DIR, "cookie" ,f'cookies_{email.replace("@", "_at_").replace(".", "_")}.pkl') self.x_client_id = self._get_x_client_id() or 'mgn9qt0o-3d5ecd48cff583a6' def get_captcha(self): """Send verification code to email.""" return True, "000000" def register(self, username, code): """Register a new account with verification code.""" url = f'{self._host}/api/auth/register' data = { 'email': self.email, 'password': self.password, 'username': username, 'code': code } try: response = self.session.post(url, json=data) result = response.json() if response.status_code == 200 and result.get('code') == 0: print(f"注册成功: {username}") return True, result else: print(f"注册失败: {result}") return False, result except Exception as e: print(f"注册异常: {e}") return False, str(e) def login(self): """Login to account and save auth token.""" url = f'{self._host}/api/auth/login' data = { 'email': self.email, 'password': self.password } try: response = self.session.post(url, json=data) result = response.json() if response.status_code == 200 and result.get('code') == 0: # Token在 data.token 中 if 'data' in result and 'token' in result['data']: self.token = result['data']['token'] # 设置 auth-token cookie self.session.cookies.set('auth-token', self.token, domain='lycheeshare.com', path='/') self.session.headers.update({'Authorization': f'Bearer {self.token}'}) # Save cookies to file self.save_cookies() print(f"登录成功: {self.email}") return True, result print(f"登录失败: {result}") return False, result except Exception as e: print(f"登录异常: {e}") return False, str(e) def _get_x_client_id(self): """Fetch x-client-id from the homepage. 访问:https://lycheeshare.com ,在源码中查找类似如下内容: window.__CLIENT_ID__ = 'mgn9qt0o-3d5ecd48cff583a6'; """ try: response = self.session.get(self._host) if response.status_code == 200: import re match = re.search(r"window\.__CLIENT_ID__\s*=\s*'([a-z0-9\-]+)'", response.text) if match: print(f"获取 x-client-id: {match.group(1)}") return match.group(1) print("无法获取 x-client-id") return None except Exception as e: print(f"获取 x-client-id 异常: {e}") return None def checkin(self): """Daily check-in to earn rewards.""" url = f'{self._host}/api/user/checkin' try: self.session.headers.update({'x-client-id': self.x_client_id}) response = self.session.post(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: print(f"签到成功: {self.email} - {result.get('message', '')}") return True, result else: print(f"签到失败: {self.email} - {result}") return False, result except Exception as e: print(f"签到异常: {e}") return False, str(e) def reset_token(self): """Reset API token if get token fails.""" url = f'{self._host}/api/user/reset-api-token' try: response = self.session.post(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: print(f"重置Token成功: {self.email}") return True, result else: print(f"重置Token失败: {result}") return False, result except Exception as e: print(f"重置Token异常: {e}") return False, str(e) def get_token(self): """Get API token for the account. If fails, reset and try again.""" url = f'{self._host}/api/user/get-api-token' try: response = self.session.get(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: api_token = None if 'data' in result: api_token = result['data'].get('token') or result['data'].get('api_token') if api_token: print(f"获取Token成功: {self.email}") print(f"API Token: {api_token}") # Save to file with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f: f.write(f"{self.email},{api_token}\n") return True, result else: # Token为空,尝试重置 print(f"Token为空,尝试重置Token: {self.email}") success, reset_result = self.reset_token() if success: # 重置成功后再次获取 import time time.sleep(1) response = self.session.get(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: if 'data' in result: api_token = result['data'].get('token') or result['data'].get('api_token') if api_token: print(f"重置后获取Token成功: {self.email}") print(f"API Token: {api_token}") with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f: f.write(f"{self.email},{api_token}\n") return True, result return False, result else: # 如果获取失败,尝试重置token print(f"获取Token失败,尝试重置Token: {self.email}") success, reset_result = self.reset_token() if success: # 重置成功后再次获取 import time time.sleep(1) response = self.session.get(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: if 'data' in result: api_token = result['data'].get('token') or result['data'].get('api_token') if api_token: print(f"重置后获取Token成功: {self.email}") print(f"API Token: {api_token}") with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f: f.write(f"{self.email},{api_token}\n") return True, result return False, result except Exception as e: print(f"获取Token异常: {e}") return False, str(e) def get_credit(self): """查询账户额度信息""" # 确保有有效会话 try: self.session.get(f'{self._host}/profile') except: pass url = f'{self._host}/api/user/credit-bundles' try: response = self.session.get(url) result = response.json() if response.status_code == 200 and result.get('code') == 0: if 'data' in result: credit_data = result['data'] print(credit_data) total_credit = credit_data["summary"].get('total_credit', 0) used_credit = credit_data["summary"].get('used_credit', 0) available_credit = credit_data["summary"].get('available_credit', 0) print(f"额度查询成功: {self.email}") print(f" 总额度: {total_credit}") print(f" 已使用: {used_credit}") print(f" 可用额度: {available_credit}") # 保存到文件 with open(os.path.join(DATA_DIR, 'credits.txt'), 'a') as f: f.write(f"{self.email},{total_credit},{used_credit},{available_credit}\n") return True, result else: print(f"额度数据为空: {self.email}") return False, result else: print(f"查询额度失败: {self.email} - {result}") return False, result except Exception as e: print(f"查询额度异常: {e}") return False, str(e) def save_cookies(self): """Save session cookies to file.""" try: # 确保cookie目录存在 cookie_dir = os.path.dirname(self.cookie_file) if not os.path.exists(cookie_dir): os.makedirs(cookie_dir) with open(self.cookie_file, 'wb') as f: pickle.dump({ 'cookies': self.session.cookies, 'token': self.token }, f) print(f"Cookies已保存到 {self.cookie_file}") except Exception as e: print(f"保存Cookies失败: {e}") def load_cookies(self): """Load session cookies from file and validate.""" if os.path.exists(self.cookie_file): try: with open(self.cookie_file, 'rb') as f: data = pickle.load(f) self.session.cookies.update(data['cookies']) self.token = data.get('token') if self.token: self.session.headers.update({'Authorization': f'Bearer {self.token}'}) print(f"Cookies已加载: {self.cookie_file}") # 验证cookie有效性 print("验证Cookie有效性...") url = f'{self._host}/api/user/credit-bundles' try: response = self.session.get(url) if response.status_code == 200 and response.json().get('code') == 0: print("Cookie有效") return True else: print(f"Cookie已失效 (状态码: {response.status_code}),需要重新登录") return False except Exception as e: print(f"验证Cookie时出错: {e},需要重新登录") return False except Exception as e: print(f"加载Cookies失败: {e}") return False return False