#!/usr/bin/env python # coding: utf-8 import pyqrcode import requests import json import xml.dom.minidom import urllib import time import re import random class WXBot: def __init__(self): self.DEBUG = False self.uuid = '' self.base_uri = '' self.redirect_uri = '' self.uin = '' self.sid = '' self.skey = '' self.pass_ticket = '' self.device_id = 'e' + repr(random.random())[2:17] self.base_request = {} self.sync_key_str = '' self.sync_key = [] self.user = {} self.member_list = [] self.contact_list = [] # contact list self.public_list = [] # public account list self.group_list = [] # group chat list self.special_list = [] # special list account self.sync_host = '' self.session = requests.Session() self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5'}) self.conf = {'qr': 'png'} def get_uuid(self): url = 'https://login.weixin.qq.com/jslogin' params = { 'appid': 'wx782c26e4c19acffb', 'fun': 'new', 'lang': 'zh_CN', '_': int(time.time())*1000 + random.randint(1, 999), } r = self.session.get(url, params=params) r.encoding = 'utf-8' data = r.text regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"' pm = re.search(regx, data) if pm: code = pm.group(1) self.uuid = pm.group(2) return code == '200' return False def gen_qr_code(self, qr_file_path): string = 'https://login.weixin.qq.com/l/' + self.uuid qr = pyqrcode.create(string) if self.conf['qr'] == 'png': qr.png(qr_file_path) elif self.conf['qr'] == 'tty': print(qr.terminal(quiet_zone=1)) def wait4login(self, tip): time.sleep(tip) url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' \ % (tip, self.uuid, int(time.time())) r = self.session.get(url) r.encoding = 'utf-8' data = r.text param = re.search(r'window.code=(\d+);', data) code = param.group(1) if code == '201': return True elif code == '200': param = re.search(r'window.redirect_uri="(\S+?)";', data) redirect_uri = param.group(1) + '&fun=new' self.redirect_uri = redirect_uri self.base_uri = redirect_uri[:redirect_uri.rfind('/')] return True elif code == '408': print '[ERROR] WeChat login timeout .' else: print '[ERROR] WeChat login exception .' return False def login(self): r = self.session.get(self.redirect_uri) r.encoding = 'utf-8' data = r.text doc = xml.dom.minidom.parseString(data) root = doc.documentElement for node in root.childNodes: if node.nodeName == 'skey': self.skey = node.childNodes[0].data elif node.nodeName == 'wxsid': self.sid = node.childNodes[0].data elif node.nodeName == 'wxuin': self.uin = node.childNodes[0].data elif node.nodeName == 'pass_ticket': self.pass_ticket = node.childNodes[0].data if '' in (self.skey, self.sid, self.uin, self.pass_ticket): return False self.base_request = { 'Uin': self.uin, 'Sid': self.sid, 'Skey': self.skey, 'DeviceID': self.device_id, } return True def init(self): url = self.base_uri + '/webwxinit?r=%i&lang=en_US&pass_ticket=%s' % (int(time.time()), self.pass_ticket) params = { 'BaseRequest': self.base_request } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) self.sync_key = dic['SyncKey'] self.user = dic['User'] self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.sync_key['List']]) return dic['BaseResponse']['Ret'] == 0 def status_notify(self): url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticket self.base_request['Uin'] = int(self.base_request['Uin']) params = { 'BaseRequest': self.base_request, "Code": 3, "FromUserName": self.user['UserName'], "ToUserName": self.user['UserName'], "ClientMsgId": int(time.time()) } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) return dic['BaseResponse']['Ret'] == 0 def get_contact(self): url = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' \ % (self.pass_ticket, self.skey, int(time.time())) r = self.session.post(url, data='{}') r.encoding = 'utf-8' if self.DEBUG: with open('contacts.json', 'w') as f: f.write(r.text.encode('utf-8')) dic = json.loads(r.text) self.member_list = dic['MemberList'] special_users = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail', 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle', 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp', 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp', 'feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder', 'weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages'] self.contact_list = [] self.public_list = [] self.special_list = [] self.group_list = [] for contact in self.member_list: if contact['VerifyFlag'] & 8 != 0: # public account self.public_list.append(contact) elif contact['UserName'] in special_users: # special account self.special_list.append(contact) elif contact['UserName'].find('@@') != -1: # group self.group_list.append(contact) elif contact['UserName'] == self.user['UserName']: # self pass else: self.contact_list.append(contact) if self.DEBUG: with open('contact_list.json', 'w') as f: f.write(json.dumps(self.contact_list)) with open('special_list.json', 'w') as f: f.write(json.dumps(self.special_list)) with open('group_list.json', 'w') as f: f.write(json.dumps(self.group_list)) with open('public_list.json', 'w') as f: f.write(json.dumps(self.public_list)) with open('member_list.json', 'w') as f: f.write(json.dumps(self.member_list)) return True def batch_get_contact(self): url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket) params = { 'BaseRequest': self.base_request, "Count": len(self.group_list), "List": [{"UserName": g['UserName'], "EncryChatRoomId":""} for g in self.group_list] } r = self.session.post(url, data=params) r.encoding = 'utf-8' dic = json.loads(r.text) return dic def test_sync_check(self): for host in ['webpush', 'webpush2']: self.sync_host = host retcode = self.sync_check()[0] if retcode == '0': return True return False def sync_check(self): params = { 'r': int(time.time()), 'sid': self.sid, 'uin': self.uin, 'skey': self.skey, 'deviceid': self.device_id, 'synckey': self.sync_key_str, '_': int(time.time()), } url = 'https://' + self.sync_host + '.weixin.qq.com/cgi-bin/mmwebwx-bin/synccheck?' + urllib.urlencode(params) r = self.session.get(url) r.encoding = 'utf-8' data = r.text pm = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', data) retcode = pm.group(1) selector = pm.group(2) return [retcode, selector] def sync(self): url = self.base_uri + '/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s' \ % (self.sid, self.skey, self.pass_ticket) params = { 'BaseRequest': self.base_request, 'SyncKey': self.sync_key, 'rr': ~int(time.time()) } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) if dic['BaseResponse']['Ret'] == 0: self.sync_key = dic['SyncKey'] self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.sync_key['List']]) return dic def get_icon(self, uid): url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey) r = self.session.get(url) data = r.content fn = 'img_'+uid+'.jpg' with open(fn, 'wb') as f: f.write(data) return fn def get_head_img(self, uid): url = self.base_uri + '/webwxgetheadimg?username=%s&skey=%s' % (uid, self.skey) r = self.session.get(url) data = r.content fn = 'img_'+uid+'.jpg' with open(fn, 'wb') as f: f.write(data) return fn def get_msg_img_url(self, msgid): return self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey) def get_msg_img(self, msgid): url = self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey) r = self.session.get(url) data = r.content fn = 'img_'+msgid+'.jpg' with open(fn, 'wb') as f: f.write(data) return fn def get_voice_url(self, msgid): return self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey) def get_voice(self, msgid): url = self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey) r = self.session.get(url) data = r.content fn = 'voice_'+msgid+'.mp3' with open(fn, 'wb') as f: f.write(data) return fn # Get the NickName or RemarkName of an user by user id def get_user_remark_name(self, uid): name = 'unknown group' if uid[:2] == '@@' else 'stranger' for member in self.member_list: if member['UserName'] == uid: name = member['RemarkName'] if member['RemarkName'] else member['NickName'] return name # Get user id of an user def get_user_id(self, name): for member in self.member_list: if name == member['RemarkName'] or name == member['NickName'] or name == member['UserName']: return member['UserName'] return None def get_user_type(self, wx_user_id): for account in self.contact_list: if wx_user_id == account['UserName']: return 'contact' for account in self.public_list: if wx_user_id == account['UserName']: return 'public' for account in self.special_list: if wx_user_id == account['UserName']: return 'special' for account in self.group_list: if wx_user_id == account['UserName']: return 'group' return 'unknown' def is_contact(self, uid): for account in self.contact_list: if uid == account['UserName']: return True return False def is_public(self, uid): for account in self.public_list: if uid == account['UserName']: return True return False def is_special(self, uid): for account in self.special_list: if uid == account['UserName']: return True return False ''' msg: user type data detail ''' def handle_msg_all(self, msg): pass ''' content_type_id: 0 -> Text 1 -> Location 3 -> Image 4 -> Voice 5 -> Recommend 6 -> Animation 7 -> Share 8 -> Video 9 -> VideoCall 10 -> Redraw 11 -> Empty 99 -> Unknown ''' def extract_msg_content(self, msg_type_id, msg): mtype = msg['MsgType'] content = msg['Content'].replace('<', '<').replace('>', '>') msg_id = msg['MsgId'] msg_content = {} if msg_type_id == 0: return {'type': 11, 'data': ''} elif msg_type_id == 2: # File Helper return {'type': 0, 'data': content.replace('
', '\n')} elif msg_type_id == 3: # Group sp = content.find('
') uid = content[:sp] content = content[sp:] content = content.replace('
', '') uid = uid[:-1] msg_content['user'] = {'id': uid, 'name': self.get_user_remark_name(uid)} if self.DEBUG: print msg_content['user']['name'] else: # Self, Contact, Special, Public, Unknown pass if mtype == 1: if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1: r = self.session.get(content) r.encoding = 'gbk' data = r.text pos = self.search_content('title', data, 'xml') msg_content['type'] = 1 msg_content['data'] = pos msg_content['detail'] = data if self.DEBUG: print ' [Location] I am at %s ' % pos else: msg_content['type'] = 0 msg_content['data'] = content.replace(u'\u2005', '') if self.DEBUG: print ' [Text] %s' % msg_content['data'] elif mtype == 3: msg_content['type'] = 3 msg_content['data'] = self.get_msg_img_url(msg_id) if self.DEBUG: image = self.get_msg_img(msg_id) print ' [Image] %s' % image elif mtype == 34: msg_content['type'] = 4 msg_content['data'] = self.get_voice_url(msg_id) if self.DEBUG: voice = self.get_voice(msg_id) print ' [Voice] %s' % voice elif mtype == 42: msg_content['type'] = 5 info = msg['RecommendInfo'] msg_content['data'] = {'nickname': info['NickName'], 'alias': info['Alias'], 'province': info['Province'], 'city': info['City'], 'gender': ['unknown', 'male', 'female'][info['Sex']]} if self.DEBUG: print ' [Recommend]' print ' -----------------------------' print ' | NickName: %s' % info['NickName'] print ' | Alias: %s' % info['Alias'] print ' | Local: %s %s' % (info['Province'], info['City']) print ' | Gender: %s' % ['unknown', 'male', 'female'][info['Sex']] print ' -----------------------------' elif mtype == 47: msg_content['type'] = 6 msg_content['data'] = self.search_content('cdnurl', content) if self.DEBUG: print ' [Animation] %s' % msg_content['data'] elif mtype == 49: msg_content['type'] = 7 app_msg_type = '' if msg['AppMsgType'] == 3: app_msg_type = 'music' elif msg['AppMsgType'] == 5: app_msg_type = 'link' elif msg['AppMsgType'] == 7: app_msg_type = 'weibo' else: app_msg_type = 'unknown' msg_content['data'] = {'type': app_msg_type, 'title': msg['FileName'], 'desc': self.search_content('des', content, 'xml'), 'url': msg['Url'], 'from': self.search_content('appname', content, 'xml')} if self.DEBUG: print ' [Share] %s' % app_msg_type print ' --------------------------' print ' | title: %s' % msg['FileName'] print ' | desc: %s' % self.search_content('des', content, 'xml') print ' | link: %s' % msg['Url'] print ' | from: %s' % self.search_content('appname', content, 'xml') print ' --------------------------' elif mtype == 62: msg_content['type'] = 8 msg_content['data'] = content if self.DEBUG: print ' [Video] Please check on mobiles' elif mtype == 53: msg_content['type'] = 9 msg_content['data'] = content if self.DEBUG: print ' [Video Call]' elif mtype == 10002: msg_content['type'] = 10 msg_content['data'] = content if self.DEBUG: print ' [Redraw]' else: msg_content['type'] = 99 msg_content['data'] = content if self.DEBUG: print ' [Unknown]' return msg_content ''' msg_type_id: 0 -> Init 1 -> Self 2 -> FileHelper 3 -> Group 4 -> Contact 5 -> Public 6 -> Special 99 -> Unknown ''' def handle_msg(self, r): for msg in r['AddMsgList']: msg_type_id = 99 user = {'id': msg['FromUserName']} if msg['MsgType'] == 51: # init message msg_type_id = 0 elif msg['FromUserName'] == self.user['UserName']: # Self msg_type_id = 1 user['name'] = 'self' elif msg['ToUserName'] == 'filehelper': # File Helper msg_type_id = 2 user['name'] = 'file_helper' elif msg['FromUserName'][:2] == '@@': # Group msg_type_id = 3 user['name'] = self.get_user_remark_name(user['id']) if self.DEBUG: print '[From] %s' % user['name'] elif self.is_contact(msg['FromUserName']): # Contact msg_type_id = 4 user['name'] = self.get_user_remark_name(user['id']) elif self.is_public(msg['FromUserName']): # Public msg_type_id = 5 user['name'] = self.get_user_remark_name(user['id']) elif self.is_special(msg['FromUserName']): # Special msg_type_id = 6 user['name'] = self.get_user_remark_name(user['id']) else: user['name'] = 'unknown' # Unknown if self.DEBUG and msg_type_id != 0: print '[MSG] %s:' % user['name'] content = self.extract_msg_content(msg_type_id, msg) message = {'msg_type_id': msg_type_id, 'msg_id': msg['MsgId'], 'content': content, 'user': user} self.handle_msg_all(message) def schedule(self): pass def proc_msg(self): self.test_sync_check() while True: [retcode, selector] = self.sync_check() if retcode == '1100': # User have login on mobile pass elif retcode == '0': if selector == '2': r = self.sync() if r is not None: self.handle_msg(r) elif selector == '7': # Play WeChat on mobile r = self.sync() if r is not None: self.handle_msg(r) elif selector == '0': time.sleep(1) self.schedule() def send_msg_by_uid(self, word, dst='filehelper'): url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticket msg_id = str(int(time.time()*1000)) + str(random.random())[:5].replace('.', '') if type(word) == 'str': word = word.decode('utf-8') params = { 'BaseRequest': self.base_request, 'Msg': { "Type": 1, "Content": word, "FromUserName": self.user['UserName'], "ToUserName": dst, "LocalID": msg_id, "ClientMsgId": msg_id } } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') r = self.session.post(url, data=data, headers=headers) dic = r.json() return dic['BaseResponse']['Ret'] == 0 def send_msg(self, name, word, isfile=False): uid = self.get_user_id(name) if uid: if isfile: with open(word, 'r') as f: result = True for line in f.readlines(): line = line.replace('\n', '') print '-> '+name+': '+line if self.send_msg_by_uid(line, uid): pass else: result = False time.sleep(1) return result else: if self.send_msg_by_uid(word, uid): return True else: return False else: if self.DEBUG: print '[ERROR] This user does not exist .' return True @staticmethod def search_content(key, content, fmat='attr'): if fmat == 'attr': pm = re.search(key+'\s?=\s?"([^"<]+)"', content) if pm: return pm.group(1) elif fmat == 'xml': pm = re.search('<{0}>([^<]+)'.format(key), content) if pm: return pm.group(1) return 'unknown' def run(self): self.get_uuid() self.gen_qr_code('qr.png') print '[INFO] Please use WeCaht to scan the QR code .' self.wait4login(1) print '[INFO] Please confirm to login .' self.wait4login(0) if self.login(): print '[INFO] Web WeChat login succeed .' else: print '[ERROR] Web WeChat login failed .' return if self.init(): print '[INFO] Web WeChat init succeed .' else: print '[INFO] Web WeChat init failed' return self.status_notify() self.get_contact() print '[INFO] Get %d contacts' % len(self.contact_list) print '[INFO] Start to process messages .' self.proc_msg()