| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- #!/usr/bin/env python
- # -*- encoding: utf-8 -*-
- '''
- @Contact : liuyuqi.gov@msn.cn
- @Time : 2025/09/30 22:31:02
- @License : Copyright © 2017-2022 liuyuqi. All Rights Reserved.
- @Desc : alidriver_checkin crawler class
- '''
- import requests
- import pickle
- import os
- # 数据目录
- DATA_DIR = 'data'
- # 确保data目录存在
- if not os.path.exists(DATA_DIR):
- os.makedirs(DATA_DIR)
- class AliDriverCheckin:
- _host = 'https://www.aliyundrive.com'
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36',
- 'Accept': 'application/json',
- 'Content-Type': 'application/json',
- 'Origin': 'https://lycheeshare.com',
- 'Referer': 'https://lycheeshare.com/profile'
- }
- def generate_account(self, number:int=20):
- """Generate random account credentials.
- :param int number: Number of accounts to generate.
- :return: A tuple of (username, password, email).
- """
- import random
- import string
- # 常用单词和拼音列表
- words = ['sky', 'sun', 'moon', 'star', 'cloud', 'rain', 'wind', 'fire', 'water', 'earth',
- 'cat', 'dog', 'bird', 'fish', 'tiger', 'lion', 'bear', 'wolf', 'fox', 'deer',
- 'blue', 'red', 'green', 'black', 'white', 'gold', 'silver', 'purple', 'orange',
- 'happy', 'lucky', 'smart', 'cool', 'fast', 'strong', 'brave', 'super', 'magic',
- 'wang', 'li', 'zhang', 'liu', 'chen', 'yang', 'huang', 'zhao', 'wu', 'zhou',
- 'xiao', 'ming', 'hong', 'wei', 'jie', 'lei', 'tao', 'jun', 'feng', 'long']
- if number < 1:
- number = 1
- if number > 100:
- number = 100
- accounts = []
- for _ in range(number):
- # 组合单词/拼音和数字
- # word1 = random.choice(words)
- # word2 = random.choice(words)
- num = random.randint(100000000, 999999999)
- username = f"1{num}"
- # 密码:字母+数字组合,8-12位
- password_len = random.randint(8, 12)
- password = ''.join(random.choices(string.ascii_letters + string.digits, k=password_len))
- email = f"{username}@qq.com"
- accounts.append((username, email, password))
- with open(os.path.join(DATA_DIR, 'generate_account.txt'), 'w') as f:
- for account in accounts:
- f.write(','.join(account) + '\n')
- return accounts
- def __init__(self, email, password):
- self.email = email
- self.password = password
- self.session = requests.Session()
- self.session.headers.update(self.headers)
- self.token = None
- self.cookie_file = os.path.join(DATA_DIR, "cookie" ,f'cookies_{email.replace("@", "_at_").replace(".", "_")}.pkl')
- self.x_client_id = self._get_x_client_id() or 'mgn9qt0o-3d5ecd48cff583a6'
- def get_captcha(self):
- """Send verification code to email."""
- return True, "000000"
- def register(self, username, code):
- """Register a new account with verification code."""
- url = f'{self._host}/api/auth/register'
- data = {
- 'email': self.email,
- 'password': self.password,
- 'username': username,
- 'code': code
- }
- try:
- response = self.session.post(url, json=data)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- print(f"注册成功: {username}")
- return True, result
- else:
- print(f"注册失败: {result}")
- return False, result
- except Exception as e:
- print(f"注册异常: {e}")
- return False, str(e)
- def login(self):
- """Login to account and save auth token."""
- url = f'{self._host}/api/auth/login'
- data = {
- 'email': self.email,
- 'password': self.password
- }
- try:
- response = self.session.post(url, json=data)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- # Token在 data.token 中
- if 'data' in result and 'token' in result['data']:
- self.token = result['data']['token']
- # 设置 auth-token cookie
- self.session.cookies.set('auth-token', self.token, domain='lycheeshare.com', path='/')
- self.session.headers.update({'Authorization': f'Bearer {self.token}'})
- # Save cookies to file
- self.save_cookies()
- print(f"登录成功: {self.email}")
- return True, result
- print(f"登录失败: {result}")
- return False, result
- except Exception as e:
- print(f"登录异常: {e}")
- return False, str(e)
- def _get_x_client_id(self):
- """Fetch x-client-id from the homepage.
- 访问:https://lycheeshare.com ,在源码中查找类似如下内容:
- window.__CLIENT_ID__ = 'mgn9qt0o-3d5ecd48cff583a6';
- """
- try:
- response = self.session.get(self._host)
- if response.status_code == 200:
- import re
- match = re.search(r"window\.__CLIENT_ID__\s*=\s*'([a-z0-9\-]+)'", response.text)
- if match:
- print(f"获取 x-client-id: {match.group(1)}")
- return match.group(1)
- print("无法获取 x-client-id")
- return None
- except Exception as e:
- print(f"获取 x-client-id 异常: {e}")
- return None
- def checkin(self):
- """Daily check-in to earn rewards."""
- url = f'{self._host}/api/user/checkin'
- try:
- self.session.headers.update({'x-client-id': self.x_client_id})
- response = self.session.post(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- print(f"签到成功: {self.email} - {result.get('message', '')}")
- return True, result
- else:
- print(f"签到失败: {self.email} - {result}")
- return False, result
- except Exception as e:
- print(f"签到异常: {e}")
- return False, str(e)
- def reset_token(self):
- """Reset API token if get token fails."""
- url = f'{self._host}/api/user/reset-api-token'
- try:
- response = self.session.post(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- print(f"重置Token成功: {self.email}")
- return True, result
- else:
- print(f"重置Token失败: {result}")
- return False, result
- except Exception as e:
- print(f"重置Token异常: {e}")
- return False, str(e)
- def get_token(self):
- """Get API token for the account. If fails, reset and try again."""
- url = f'{self._host}/api/user/get-api-token'
- try:
- response = self.session.get(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- api_token = None
- if 'data' in result:
- api_token = result['data'].get('token') or result['data'].get('api_token')
- if api_token:
- print(f"获取Token成功: {self.email}")
- print(f"API Token: {api_token}")
- # Save to file
- with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
- f.write(f"{self.email},{api_token}\n")
- return True, result
- else:
- # Token为空,尝试重置
- print(f"Token为空,尝试重置Token: {self.email}")
- success, reset_result = self.reset_token()
- if success:
- # 重置成功后再次获取
- import time
- time.sleep(1)
- response = self.session.get(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- if 'data' in result:
- api_token = result['data'].get('token') or result['data'].get('api_token')
- if api_token:
- print(f"重置后获取Token成功: {self.email}")
- print(f"API Token: {api_token}")
- with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
- f.write(f"{self.email},{api_token}\n")
- return True, result
- return False, result
- else:
- # 如果获取失败,尝试重置token
- print(f"获取Token失败,尝试重置Token: {self.email}")
- success, reset_result = self.reset_token()
- if success:
- # 重置成功后再次获取
- import time
- time.sleep(1)
- response = self.session.get(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- if 'data' in result:
- api_token = result['data'].get('token') or result['data'].get('api_token')
- if api_token:
- print(f"重置后获取Token成功: {self.email}")
- print(f"API Token: {api_token}")
- with open(os.path.join(DATA_DIR, 'api_tokens.txt'), 'a') as f:
- f.write(f"{self.email},{api_token}\n")
- return True, result
- return False, result
- except Exception as e:
- print(f"获取Token异常: {e}")
- return False, str(e)
- def get_credit(self):
- """查询账户额度信息"""
- # 确保有有效会话
- try:
- self.session.get(f'{self._host}/profile')
- except:
- pass
- url = f'{self._host}/api/user/credit-bundles'
- try:
- response = self.session.get(url)
- result = response.json()
- if response.status_code == 200 and result.get('code') == 0:
- if 'data' in result:
- credit_data = result['data']
- print(credit_data)
- total_credit = credit_data["summary"].get('total_credit', 0)
- used_credit = credit_data["summary"].get('used_credit', 0)
- available_credit = credit_data["summary"].get('available_credit', 0)
- print(f"额度查询成功: {self.email}")
- print(f" 总额度: {total_credit}")
- print(f" 已使用: {used_credit}")
- print(f" 可用额度: {available_credit}")
- # 保存到文件
- with open(os.path.join(DATA_DIR, 'credits.txt'), 'a') as f:
- f.write(f"{self.email},{total_credit},{used_credit},{available_credit}\n")
- return True, result
- else:
- print(f"额度数据为空: {self.email}")
- return False, result
- else:
- print(f"查询额度失败: {self.email} - {result}")
- return False, result
- except Exception as e:
- print(f"查询额度异常: {e}")
- return False, str(e)
- def save_cookies(self):
- """Save session cookies to file."""
- try:
- # 确保cookie目录存在
- cookie_dir = os.path.dirname(self.cookie_file)
- if not os.path.exists(cookie_dir):
- os.makedirs(cookie_dir)
-
- with open(self.cookie_file, 'wb') as f:
- pickle.dump({
- 'cookies': self.session.cookies,
- 'token': self.token
- }, f)
- print(f"Cookies已保存到 {self.cookie_file}")
- except Exception as e:
- print(f"保存Cookies失败: {e}")
- def load_cookies(self):
- """Load session cookies from file and validate."""
- if os.path.exists(self.cookie_file):
- try:
- with open(self.cookie_file, 'rb') as f:
- data = pickle.load(f)
- self.session.cookies.update(data['cookies'])
- self.token = data.get('token')
- if self.token:
- self.session.headers.update({'Authorization': f'Bearer {self.token}'})
- print(f"Cookies已加载: {self.cookie_file}")
- # 验证cookie有效性
- print("验证Cookie有效性...")
- url = f'{self._host}/api/user/credit-bundles'
- try:
- response = self.session.get(url)
- if response.status_code == 200 and response.json().get('code') == 0:
- print("Cookie有效")
- return True
- else:
- print(f"Cookie已失效 (状态码: {response.status_code}),需要重新登录")
- return False
- except Exception as e:
- print(f"验证Cookie时出错: {e},需要重新登录")
- return False
- except Exception as e:
- print(f"加载Cookies失败: {e}")
- return False
- return False
|