123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990 |
- #!/usr/bin/env python
- # coding: utf-8
- import os
- import sys
- import traceback
- import webbrowser
- import pyqrcode
- import requests
- import json
- import xml.dom.minidom
- import urllib
- import time
- import re
- import random
- from traceback import format_exc
- from requests.exceptions import ConnectionError, ReadTimeout
- import HTMLParser
- UNKONWN = 'unkonwn'
- SUCCESS = '200'
- SCANED = '201'
- TIMEOUT = '408'
- def show_image(file_path):
- """
- 跨平台显示图片文件
- :param file_path: 图片文件路径
- """
- if sys.version_info >= (3, 3):
- from shlex import quote
- else:
- from pipes import quote
- if sys.platform == "darwin":
- command = "open -a /Applications/Preview.app %s&" % quote(file_path)
- os.system(command)
- else:
- webbrowser.open(file_path)
- class SafeSession(requests.Session):
- def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None,
- timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None,
- json=None):
- for i in range(3):
- try:
- return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth,
- timeout,
- allow_redirects, proxies, hooks, stream, verify, cert, json)
- except Exception as e:
- print e.message, traceback.format_exc()
- continue
- class WXBot:
- """WXBot功能类"""
- def __init__(self):
- self.DEBUG = False
- self.uuid = ''
- self.base_uri = ''
- self.redirect_uri = ''
- self.uin = ''
- self.sid = ''
- self.skey = ''
- self.pass_ticket = ''
- self.device_id = 'e' + repr(random.random())[2:17]
- self.base_request = {}
- self.sync_key_str = ''
- self.sync_key = []
- self.sync_host = ''
- self.session = SafeSession()
- self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5'})
- self.conf = {'qr': 'png'}
- self.my_account = {} # 当前账户
- # 所有相关账号: 联系人, 公众号, 群组, 特殊账号
- self.member_list = []
- # 所有群组的成员, {'group_id1': [member1, member2, ...], ...}
- self.group_members = {}
- # 所有账户, {'group_member':{'id':{'type':'group_member', 'info':{}}, ...}, 'normal_member':{'id':{}, ...}}
- self.account_info = {'group_member': {}, 'normal_member': {}}
- self.contact_list = [] # 联系人列表
- self.public_list = [] # 公众账号列表
- self.group_list = [] # 群聊列表
- self.special_list = [] # 特殊账号列表
- self.encry_chat_room_id_list = [] # 存储群聊的EncryChatRoomId,获取群内成员头像时需要用到
- @staticmethod
- def to_unicode(string, encoding='utf-8'):
- """
- 将字符串转换为Unicode
- :param string: 待转换字符串
- :param encoding: 字符串解码方式
- :return: 转换后的Unicode字符串
- """
- if isinstance(string, str):
- return string.decode(encoding)
- elif isinstance(string, unicode):
- return string
- else:
- raise Exception('Unknown Type')
- def get_contact(self):
- """获取当前账户的所有相关账号(包括联系人、公众号、群聊、特殊账号)"""
- url = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' \
- % (self.pass_ticket, self.skey, int(time.time()))
- r = self.session.post(url, data='{}')
- r.encoding = 'utf-8'
- if self.DEBUG:
- with open('contacts.json', 'w') as f:
- f.write(r.text.encode('utf-8'))
- dic = json.loads(r.text)
- self.member_list = dic['MemberList']
- special_users = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail',
- 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle',
- 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp',
- 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp',
- 'feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder',
- 'weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c',
- 'officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11',
- 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages']
- self.contact_list = []
- self.public_list = []
- self.special_list = []
- self.group_list = []
- for contact in self.member_list:
- if contact['VerifyFlag'] & 8 != 0: # 公众号
- self.public_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact}
- elif contact['UserName'] in special_users: # 特殊账户
- self.special_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact}
- elif contact['UserName'].find('@@') != -1: # 群聊
- self.group_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact}
- elif contact['UserName'] == self.my_account['UserName']: # 自己
- self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact}
- else:
- self.contact_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact}
- self.batch_get_group_members()
- for group in self.group_members:
- for member in self.group_members[group]:
- if member['UserName'] not in self.account_info:
- self.account_info['group_member'][member['UserName']] = \
- {'type': 'group_member', 'info': member, 'group': group}
- if self.DEBUG:
- with open('contact_list.json', 'w') as f:
- f.write(json.dumps(self.contact_list))
- with open('special_list.json', 'w') as f:
- f.write(json.dumps(self.special_list))
- with open('group_list.json', 'w') as f:
- f.write(json.dumps(self.group_list))
- with open('public_list.json', 'w') as f:
- f.write(json.dumps(self.public_list))
- with open('member_list.json', 'w') as f:
- f.write(json.dumps(self.member_list))
- with open('group_users.json', 'w') as f:
- f.write(json.dumps(self.group_members))
- with open('account_info.json', 'w') as f:
- f.write(json.dumps(self.account_info))
- return True
- def batch_get_group_members(self):
- """批量获取所有群聊成员信息"""
- url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request,
- "Count": len(self.group_list),
- "List": [{"UserName": group['UserName'], "EncryChatRoomId": ""} for group in self.group_list]
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- group_members = {}
- encry_chat_room_id = {}
- for group in dic['ContactList']:
- gid = group['UserName']
- members = group['MemberList']
- group_members[gid] = members
- encry_chat_room_id[gid] = group['EncryChatRoomId']
- self.group_members = group_members
- self.encry_chat_room_id_list = encry_chat_room_id
- def get_group_member_name(self, gid, uid):
- """
- 获取群聊中指定成员的名称信息
- :param gid: 群id
- :param uid: 群聊成员id
- :return: 名称信息,类似 {"display_name": "test_user", "nickname": "test", "remark_name": "for_test" }
- """
- if gid not in self.group_members:
- return None
- group = self.group_members[gid]
- for member in group:
- if member['UserName'] == uid:
- names = {}
- if 'RemarkName' in member and member['RemarkName']:
- names['remark_name'] = member['RemarkName']
- if 'NickName' in member and member['NickName']:
- names['nickname'] = member['NickName']
- if 'DisplayName' in member and member['DisplayName']:
- names['display_name'] = member['DisplayName']
- return names
- return None
- def get_contact_info(self, uid):
- return self.account_info['normal_member'].get(uid)
- def get_group_member_info(self, uid):
- return self.account_info['group_member'].get(uid)
- def get_contact_name(self, uid):
- info = self.get_contact_info(uid)
- if info is None:
- return None
- info = info['info']
- name = {}
- if 'RemarkName' in info and info['RemarkName']:
- name['remark_name'] = info['RemarkName']
- if 'NickName' in info and info['NickName']:
- name['nickname'] = info['NickName']
- if 'DisplayName' in info and info['DisplayName']:
- name['display_name'] = info['DisplayName']
- if len(name) == 0:
- return None
- else:
- return name
- @staticmethod
- def get_contact_prefer_name(name):
- if name is None:
- return None
- if 'remark_name' in name:
- return name['remark_name']
- if 'nickname' in name:
- return name['nickname']
- if 'display_name' in name:
- return name['display_name']
- return None
- @staticmethod
- def get_group_member_prefer_name(name):
- if name is None:
- return None
- if 'remark_name' in name:
- return name['remark_name']
- if 'display_name' in name:
- return name['display_name']
- if 'nickname' in name:
- return name['nickname']
- return None
- def get_user_type(self, wx_user_id):
- """
- 获取特定账号与自己的关系
- :param wx_user_id: 账号id:
- :return: 与当前账号的关系
- """
- for account in self.contact_list:
- if wx_user_id == account['UserName']:
- return 'contact'
- for account in self.public_list:
- if wx_user_id == account['UserName']:
- return 'public'
- for account in self.special_list:
- if wx_user_id == account['UserName']:
- return 'special'
- for account in self.group_list:
- if wx_user_id == account['UserName']:
- return 'group'
- for group in self.group_members:
- for member in self.group_members[group]:
- if member['UserName'] == wx_user_id:
- return 'group_member'
- return 'unknown'
- def is_contact(self, uid):
- for account in self.contact_list:
- if uid == account['UserName']:
- return True
- return False
- def is_public(self, uid):
- for account in self.public_list:
- if uid == account['UserName']:
- return True
- return False
- def is_special(self, uid):
- for account in self.special_list:
- if uid == account['UserName']:
- return True
- return False
- def handle_msg_all(self, msg):
- """
- 处理所有消息,请子类化后覆盖此函数
- msg:
- msg_id -> 消息id
- msg_type_id -> 消息类型id
- user -> 发送消息的账号id
- content -> 消息内容
- :param msg: 收到的消息
- """
- pass
- @staticmethod
- def proc_at_info(msg):
- if not msg:
- return '', []
- segs = msg.split(u'\u2005')
- str_msg_all = ''
- str_msg = ''
- infos = []
- if len(segs) > 1:
- for i in range(0, len(segs) - 1):
- segs[i] += u'\u2005'
- pm = re.search(u'@.*\u2005', segs[i]).group()
- if pm:
- name = pm[1:-1]
- string = segs[i].replace(pm, '')
- str_msg_all += string + '@' + name + ' '
- str_msg += string
- if string:
- infos.append({'type': 'str', 'value': string})
- infos.append({'type': 'at', 'value': name})
- else:
- infos.append({'type': 'str', 'value': segs[i]})
- str_msg_all += segs[i]
- str_msg += segs[i]
- str_msg_all += segs[-1]
- str_msg += segs[-1]
- infos.append({'type': 'str', 'value': segs[-1]})
- else:
- infos.append({'type': 'str', 'value': segs[-1]})
- str_msg_all = msg
- str_msg = msg
- return str_msg_all.replace(u'\u2005', ''), str_msg.replace(u'\u2005', ''), infos
- def extract_msg_content(self, msg_type_id, msg):
- """
- content_type_id:
- 0 -> Text
- 1 -> Location
- 3 -> Image
- 4 -> Voice
- 5 -> Recommend
- 6 -> Animation
- 7 -> Share
- 8 -> Video
- 9 -> VideoCall
- 10 -> Redraw
- 11 -> Empty
- 99 -> Unknown
- :param msg_type_id: 消息类型id
- :param msg: 消息结构体
- :return: 解析的消息
- """
- mtype = msg['MsgType']
- content = HTMLParser.HTMLParser().unescape(msg['Content'])
- msg_id = msg['MsgId']
- msg_content = {}
- if msg_type_id == 0:
- return {'type': 11, 'data': ''}
- elif msg_type_id == 2: # File Helper
- return {'type': 0, 'data': content.replace('<br/>', '\n')}
- elif msg_type_id == 3: # 群聊
- sp = content.find('<br/>')
- uid = content[:sp]
- content = content[sp:]
- content = content.replace('<br/>', '')
- uid = uid[:-1]
- name = self.get_contact_prefer_name(self.get_contact_name(uid))
- if not name:
- name = self.get_group_member_prefer_name(self.get_group_member_name(uid, msg['FromUserName']))
- if not name:
- name = 'unknown'
- msg_content['user'] = {'id': uid, 'name': name}
- else: # Self, Contact, Special, Public, Unknown
- pass
- msg_prefix = (msg_content['user']['name'] + ':') if 'user' in msg_content else ''
- if mtype == 1:
- if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1:
- r = self.session.get(content)
- r.encoding = 'gbk'
- data = r.text
- pos = self.search_content('title', data, 'xml')
- msg_content['type'] = 1
- msg_content['data'] = pos
- msg_content['detail'] = data
- if self.DEBUG:
- print ' %s[Location] %s ' % (msg_prefix, pos)
- else:
- msg_content['type'] = 0
- if msg_type_id == 3 or (msg_type_id == 1 and msg['ToUserName'][:2] == '@@'): # Group text message
- msg_infos = self.proc_at_info(content)
- str_msg_all = msg_infos[0]
- str_msg = msg_infos[1]
- detail = msg_infos[2]
- msg_content['data'] = str_msg_all
- msg_content['detail'] = detail
- msg_content['desc'] = str_msg
- else:
- msg_content['data'] = content
- if self.DEBUG:
- try:
- print ' %s[Text] %s' % (msg_prefix, msg_content['data'])
- except UnicodeEncodeError:
- print ' %s[Text] (illegal text).' % msg_prefix
- elif mtype == 3:
- msg_content['type'] = 3
- msg_content['data'] = self.get_msg_img_url(msg_id)
- msg_content['img'] = self.session.get(msg_content['data']).content.encode('hex')
- if self.DEBUG:
- image = self.get_msg_img(msg_id)
- print ' %s[Image] %s' % (msg_prefix, image)
- elif mtype == 34:
- msg_content['type'] = 4
- msg_content['data'] = self.get_voice_url(msg_id)
- msg_content['voice'] = self.session.get(msg_content['data']).content.encode('hex')
- if self.DEBUG:
- voice = self.get_voice(msg_id)
- print ' %s[Voice] %s' % (msg_prefix, voice)
- elif mtype == 42:
- msg_content['type'] = 5
- info = msg['RecommendInfo']
- msg_content['data'] = {'nickname': info['NickName'],
- 'alias': info['Alias'],
- 'province': info['Province'],
- 'city': info['City'],
- 'gender': ['unknown', 'male', 'female'][info['Sex']]}
- if self.DEBUG:
- print ' %s[Recommend]' % msg_prefix
- print ' -----------------------------'
- print ' | NickName: %s' % info['NickName']
- print ' | Alias: %s' % info['Alias']
- print ' | Local: %s %s' % (info['Province'], info['City'])
- print ' | Gender: %s' % ['unknown', 'male', 'female'][info['Sex']]
- print ' -----------------------------'
- elif mtype == 47:
- msg_content['type'] = 6
- msg_content['data'] = self.search_content('cdnurl', content)
- if self.DEBUG:
- print ' %s[Animation] %s' % (msg_prefix, msg_content['data'])
- elif mtype == 49:
- msg_content['type'] = 7
- if msg['AppMsgType'] == 3:
- app_msg_type = 'music'
- elif msg['AppMsgType'] == 5:
- app_msg_type = 'link'
- elif msg['AppMsgType'] == 7:
- app_msg_type = 'weibo'
- else:
- app_msg_type = 'unknown'
- msg_content['data'] = {'type': app_msg_type,
- 'title': msg['FileName'],
- 'desc': self.search_content('des', content, 'xml'),
- 'url': msg['Url'],
- 'from': self.search_content('appname', content, 'xml'),
- 'content': msg.get('Content') # 有的公众号会发一次性3 4条链接一个大图,如果只url那只能获取第一条,content里面有所有的链接
- }
- if self.DEBUG:
- print ' %s[Share] %s' % (msg_prefix, app_msg_type)
- print ' --------------------------'
- print ' | title: %s' % msg['FileName']
- print ' | desc: %s' % self.search_content('des', content, 'xml')
- print ' | link: %s' % msg['Url']
- print ' | from: %s' % self.search_content('appname', content, 'xml')
- print ' | content: %s' % msg.get('content')[:20]
- print ' --------------------------'
- elif mtype == 62:
- msg_content['type'] = 8
- msg_content['data'] = content
- if self.DEBUG:
- print ' %s[Video] Please check on mobiles' % msg_prefix
- elif mtype == 53:
- msg_content['type'] = 9
- msg_content['data'] = content
- if self.DEBUG:
- print ' %s[Video Call]' % msg_prefix
- elif mtype == 10002:
- msg_content['type'] = 10
- msg_content['data'] = content
- if self.DEBUG:
- print ' %s[Redraw]' % msg_prefix
- elif mtype == 10000: # unknown, maybe red packet, or group invite
- msg_content['type'] = 12
- msg_content['data'] = msg['Content']
- if self.DEBUG:
- print ' [Unknown]'
- else:
- msg_content['type'] = 99
- msg_content['data'] = content
- if self.DEBUG:
- print ' %s[Unknown]' % msg_prefix
- return msg_content
- def handle_msg(self, r):
- """
- 处理原始微信消息的内部函数
- msg_type_id:
- 0 -> Init
- 1 -> Self
- 2 -> FileHelper
- 3 -> Group
- 4 -> Contact
- 5 -> Public
- 6 -> Special
- 99 -> Unknown
- :param r: 原始微信消息
- """
- for msg in r['AddMsgList']:
- user = {'id': msg['FromUserName'], 'name': 'unknown'}
- if msg['MsgType'] == 51: # init message
- msg_type_id = 0
- user['name'] = 'system'
- elif msg['FromUserName'] == self.my_account['UserName']: # Self
- msg_type_id = 1
- user['name'] = 'self'
- elif msg['ToUserName'] == 'filehelper': # File Helper
- msg_type_id = 2
- user['name'] = 'file_helper'
- elif msg['FromUserName'][:2] == '@@': # Group
- msg_type_id = 3
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_contact(msg['FromUserName']): # Contact
- msg_type_id = 4
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_public(msg['FromUserName']): # Public
- msg_type_id = 5
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_special(msg['FromUserName']): # Special
- msg_type_id = 6
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- else:
- msg_type_id = 99
- user['name'] = 'unknown'
- if not user['name']:
- user['name'] = 'unknown'
- user['name'] = HTMLParser.HTMLParser().unescape(user['name'])
- if self.DEBUG and msg_type_id != 0:
- print '[MSG] %s:' % user['name']
- content = self.extract_msg_content(msg_type_id, msg)
- message = {'msg_type_id': msg_type_id,
- 'msg_id': msg['MsgId'],
- 'content': content,
- 'to_user_id': msg['ToUserName'],
- 'user': user}
- self.handle_msg_all(message)
- def schedule(self):
- """
- 做任务型事情的函数,如果需要,可以在子类中覆盖此函数
- 此函数在处理消息的间隙被调用,请不要长时间阻塞此函数
- """
- pass
- def proc_msg(self):
- self.test_sync_check()
- while True:
- check_time = time.time()
- try:
- [retcode, selector] = self.sync_check()
- # print '[DEBUG] sync_check:', retcode, selector
- if retcode == '1100': # 从微信客户端上登出
- break
- elif retcode == '1101': # 从其它设备上登了网页微信
- break
- elif retcode == '0':
- if selector == '2': # 有新消息
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '3': # 未知
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '6': # 可能是红包
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '7': # 在手机上操作了微信
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '0': # 无事件
- pass
- else:
- print '[DEBUG] sync_check:', retcode, selector
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- else:
- print '[DEBUG] sync_check:', retcode, selector
- self.schedule()
- except:
- print '[ERROR] Except in proc_msg'
- print format_exc()
- check_time = time.time() - check_time
- if check_time < 0.8:
- time.sleep(1 - check_time)
- def send_msg_by_uid(self, word, dst='filehelper'):
- url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticket
- msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')
- word = self.to_unicode(word)
- params = {
- 'BaseRequest': self.base_request,
- 'Msg': {
- "Type": 1,
- "Content": word,
- "FromUserName": self.my_account['UserName'],
- "ToUserName": dst,
- "LocalID": msg_id,
- "ClientMsgId": msg_id
- }
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def get_user_id(self, name):
- if name == '':
- return None
- name = self.to_unicode(name)
- for contact in self.contact_list:
- if 'RemarkName' in contact and contact['RemarkName'] == name:
- return contact['UserName']
- elif 'NickName' in contact and contact['NickName'] == name:
- return contact['UserName']
- elif 'DisplayName' in contact and contact['DisplayName'] == name:
- return contact['UserName']
- for group in self.group_list:
- if 'RemarkName' in group and group['RemarkName'] == name:
- return group['UserName']
- if 'NickName' in group and group['NickName'] == name:
- return group['UserName']
- if 'DisplayName' in group and group['DisplayName'] == name:
- return group['UserName']
- return ''
- def send_msg(self, name, word, isfile=False):
- uid = self.get_user_id(name)
- if uid is not None:
- if isfile:
- with open(word, 'r') as f:
- result = True
- for line in f.readlines():
- line = line.replace('\n', '')
- print '-> ' + name + ': ' + line
- if self.send_msg_by_uid(line, uid):
- pass
- else:
- result = False
- time.sleep(1)
- return result
- else:
- word = self.to_unicode(word)
- if self.send_msg_by_uid(word, uid):
- return True
- else:
- return False
- else:
- if self.DEBUG:
- print '[ERROR] This user does not exist .'
- return True
- @staticmethod
- def search_content(key, content, fmat='attr'):
- if fmat == 'attr':
- pm = re.search(key + '\s?=\s?"([^"<]+)"', content)
- if pm:
- return pm.group(1)
- elif fmat == 'xml':
- pm = re.search('<{0}>([^<]+)</{0}>'.format(key), content)
- if pm:
- return pm.group(1)
- return 'unknown'
- def run(self):
- self.get_uuid()
- self.gen_qr_code('qr.png')
- print '[INFO] Please use WeChat to scan the QR code .'
- result = self.wait4login()
- if result != SUCCESS:
- print '[ERROR] Web WeChat login failed. failed code=%s' % (result,)
- return
- if self.login():
- print '[INFO] Web WeChat login succeed .'
- else:
- print '[ERROR] Web WeChat login failed .'
- return
- if self.init():
- print '[INFO] Web WeChat init succeed .'
- else:
- print '[INFO] Web WeChat init failed'
- return
- self.status_notify()
- self.get_contact()
- print '[INFO] Get %d contacts' % len(self.contact_list)
- print '[INFO] Start to process messages .'
- self.proc_msg()
- def get_uuid(self):
- url = 'https://login.weixin.qq.com/jslogin'
- params = {
- 'appid': 'wx782c26e4c19acffb',
- 'fun': 'new',
- 'lang': 'zh_CN',
- '_': int(time.time()) * 1000 + random.randint(1, 999),
- }
- r = self.session.get(url, params=params)
- r.encoding = 'utf-8'
- data = r.text
- regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
- pm = re.search(regx, data)
- if pm:
- code = pm.group(1)
- self.uuid = pm.group(2)
- return code == '200'
- return False
- def gen_qr_code(self, qr_file_path):
- string = 'https://login.weixin.qq.com/l/' + self.uuid
- qr = pyqrcode.create(string)
- if self.conf['qr'] == 'png':
- qr.png(qr_file_path, scale=8)
- show_image(qr_file_path)
- # img = Image.open(qr_file_path)
- # img.show()
- elif self.conf['qr'] == 'tty':
- print(qr.terminal(quiet_zone=1))
- def do_request(self, url):
- r = self.session.get(url)
- r.encoding = 'utf-8'
- data = r.text
- param = re.search(r'window.code=(\d+);', data)
- code = param.group(1)
- return code, data
- def wait4login(self):
- """
- http comet:
- tip=1, 等待用户扫描二维码,
- 201: scaned
- 408: timeout
- tip=0, 等待用户确认登录,
- 200: confirmed
- """
- LOGIN_TEMPLATE = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s'
- tip = 1
- try_later_secs = 1
- MAX_RETRY_TIMES = 10
- code = UNKONWN
- retry_time = MAX_RETRY_TIMES
- while retry_time > 0:
- url = LOGIN_TEMPLATE % (tip, self.uuid, int(time.time()))
- code, data = self.do_request(url)
- if code == SCANED:
- print '[INFO] Please confirm to login .'
- tip = 0
- elif code == SUCCESS: # 确认登录成功
- param = re.search(r'window.redirect_uri="(\S+?)";', data)
- redirect_uri = param.group(1) + '&fun=new'
- self.redirect_uri = redirect_uri
- self.base_uri = redirect_uri[:redirect_uri.rfind('/')]
- return code
- elif code == TIMEOUT:
- print '[ERROR] WeChat login timeout. retry in %s secs later...' % (try_later_secs,)
- tip = 1 # 重置
- retry_time -= 1
- time.sleep(try_later_secs)
- else:
- print ('[ERROR] WeChat login exception return_code=%s. retry in %s secs later...' %
- (code, try_later_secs))
- tip = 1
- retry_time -= 1
- time.sleep(try_later_secs)
- return code
- def login(self):
- if len(self.redirect_uri) < 4:
- print '[ERROR] Login failed due to network problem, please try again.'
- return False
- r = self.session.get(self.redirect_uri)
- r.encoding = 'utf-8'
- data = r.text
- doc = xml.dom.minidom.parseString(data)
- root = doc.documentElement
- for node in root.childNodes:
- if node.nodeName == 'skey':
- self.skey = node.childNodes[0].data
- elif node.nodeName == 'wxsid':
- self.sid = node.childNodes[0].data
- elif node.nodeName == 'wxuin':
- self.uin = node.childNodes[0].data
- elif node.nodeName == 'pass_ticket':
- self.pass_ticket = node.childNodes[0].data
- if '' in (self.skey, self.sid, self.uin, self.pass_ticket):
- return False
- self.base_request = {
- 'Uin': self.uin,
- 'Sid': self.sid,
- 'Skey': self.skey,
- 'DeviceID': self.device_id,
- }
- return True
- def init(self):
- url = self.base_uri + '/webwxinit?r=%i&lang=en_US&pass_ticket=%s' % (int(time.time()), self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- self.sync_key = dic['SyncKey']
- self.my_account = dic['User']
- self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])
- for keyVal in self.sync_key['List']])
- return dic['BaseResponse']['Ret'] == 0
- def status_notify(self):
- url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticket
- self.base_request['Uin'] = int(self.base_request['Uin'])
- params = {
- 'BaseRequest': self.base_request,
- "Code": 3,
- "FromUserName": self.my_account['UserName'],
- "ToUserName": self.my_account['UserName'],
- "ClientMsgId": int(time.time())
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- return dic['BaseResponse']['Ret'] == 0
- def test_sync_check(self):
- for host in ['webpush', 'webpush2']:
- self.sync_host = host
- retcode = self.sync_check()[0]
- if retcode == '0':
- return True
- return False
- def sync_check(self):
- params = {
- 'r': int(time.time()),
- 'sid': self.sid,
- 'uin': self.uin,
- 'skey': self.skey,
- 'deviceid': self.device_id,
- 'synckey': self.sync_key_str,
- '_': int(time.time()),
- }
- url = 'https://' + self.sync_host + '.weixin.qq.com/cgi-bin/mmwebwx-bin/synccheck?' + urllib.urlencode(params)
- try:
- r = self.session.get(url, timeout=60)
- r.encoding = 'utf-8'
- data = r.text
- pm = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', data)
- retcode = pm.group(1)
- selector = pm.group(2)
- return [retcode, selector]
- except:
- return [-1, -1]
- def sync(self):
- url = self.base_uri + '/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s' \
- % (self.sid, self.skey, self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request,
- 'SyncKey': self.sync_key,
- 'rr': ~int(time.time())
- }
- try:
- r = self.session.post(url, data=json.dumps(params), timeout=60)
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- if dic['BaseResponse']['Ret'] == 0:
- self.sync_key = dic['SyncKey']
- self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])
- for keyVal in self.sync_key['List']])
- return dic
- except:
- return None
- def get_icon(self, uid, gid=None):
- """
- 获取联系人或者群聊成员头像
- :param uid: 联系人id
- :param gid: 群id,如果为非None获取群中成员头像,如果为None则获取联系人头像
- """
- if gid is None:
- url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey)
- else:
- url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % (
- uid, self.skey, self.encry_chat_room_id_list[gid])
- r = self.session.get(url)
- data = r.content
- fn = 'icon_' + uid + '.jpg'
- with open(fn, 'wb') as f:
- f.write(data)
- return fn
- def get_head_img(self, uid):
- """
- 获取群头像
- :param uid: 群uid
- """
- url = self.base_uri + '/webwxgetheadimg?username=%s&skey=%s' % (uid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'head_' + uid + '.jpg'
- with open(fn, 'wb') as f:
- f.write(data)
- return fn
- def get_msg_img_url(self, msgid):
- return self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)
- def get_msg_img(self, msgid):
- """
- 获取图片消息,下载图片到本地
- :param msgid: 消息id
- :return: 保存的本地图片文件路径
- """
- url = self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'img_' + msgid + '.jpg'
- with open(fn, 'wb') as f:
- f.write(data)
- return fn
- def get_voice_url(self, msgid):
- return self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)
- def get_voice(self, msgid):
- """
- 获取语音消息,下载语音到本地
- :param msgid: 语音消息id
- :return: 保存的本地语音文件路径
- """
- url = self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'voice_' + msgid + '.mp3'
- with open(fn, 'wb') as f:
- f.write(data)
- return fn
|