|
@@ -7,6 +7,7 @@ import traceback
|
|
|
import webbrowser
|
|
|
import pyqrcode
|
|
|
import requests
|
|
|
+import mimetypes
|
|
|
import json
|
|
|
import xml.dom.minidom
|
|
|
import urllib
|
|
@@ -93,6 +94,8 @@ class WXBot:
|
|
|
self.special_list = [] # 特殊账号列表
|
|
|
self.encry_chat_room_id_list = [] # 存储群聊的EncryChatRoomId,获取群内成员头像时需要用到
|
|
|
|
|
|
+ self.file_index = 0
|
|
|
+
|
|
|
@staticmethod
|
|
|
def to_unicode(string, encoding='utf-8'):
|
|
|
"""
|
|
@@ -645,6 +648,100 @@ class WXBot:
|
|
|
dic = r.json()
|
|
|
return dic['BaseResponse']['Ret'] == 0
|
|
|
|
|
|
+ def upload_media(self, fpath, is_img=False):
|
|
|
+ if not os.path.exists(fpath):
|
|
|
+ print '[ERROR] File not exists.'
|
|
|
+ return None
|
|
|
+ url_1 = 'https://file.wx.qq.com/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'
|
|
|
+ url_2 = 'https://file2.wx.qq.com/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'
|
|
|
+ flen = str(os.path.getsize(fpath))
|
|
|
+ ftype = mimetypes.guess_type(fpath)[0] or 'application/octet-stream'
|
|
|
+ files = {
|
|
|
+ 'id': (None, 'WU_FILE_%s' % str(self.file_index)),
|
|
|
+ 'name': (None, os.path.basename(fpath)),
|
|
|
+ 'type': (None, ftype),
|
|
|
+ 'lastModifiedDate': (None, time.strftime('%m/%d/%Y, %H:%M:%S GMT+0800 (CST)')),
|
|
|
+ 'size': (None, flen),
|
|
|
+ 'mediatype': (None, 'pic' if is_img else 'doc'),
|
|
|
+ 'uploadmediarequest': (None, json.dumps({
|
|
|
+ 'BaseRequest': self.base_request,
|
|
|
+ 'ClientMediaId': int(time.time()),
|
|
|
+ 'TotalLen': flen,
|
|
|
+ 'StartPos': 0,
|
|
|
+ 'DataLen': flen,
|
|
|
+ 'MediaType': 4,
|
|
|
+ })),
|
|
|
+ 'webwx_data_ticket': (None, self.session.cookies['webwx_data_ticket']),
|
|
|
+ 'pass_ticket': (None, self.pass_ticket),
|
|
|
+ 'filename': (os.path.basename(fpath), open(fpath, 'rb'),ftype.split('/')[1]),
|
|
|
+ }
|
|
|
+ self.file_index += 1
|
|
|
+ try:
|
|
|
+ r = self.session.post(url_1, files=files)
|
|
|
+ if json.loads(r.text)['BaseResponse']['Ret'] != 0:
|
|
|
+ # 当file返回值不为0时则为上传失败,尝试第二服务器上传
|
|
|
+ r = self.session.post(url_2, files=files)
|
|
|
+ if json.loads(r.text)['BaseResponse']['Ret'] != 0:
|
|
|
+ print '[ERROR] Upload media failure.'
|
|
|
+ return None
|
|
|
+ mid = json.loads(r.text)['MediaId']
|
|
|
+ return mid
|
|
|
+ except Exception,e:
|
|
|
+ return None
|
|
|
+
|
|
|
+ def send_file_msg_by_uid(self, fpath, uid):
|
|
|
+ mid = self.upload_media(fpath)
|
|
|
+ if mid is None or not mid:
|
|
|
+ return False
|
|
|
+ url = self.base_uri + '/webwxsendappmsg?fun=async&f=json&pass_ticket=' + self.pass_ticket
|
|
|
+ msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')
|
|
|
+ data = {
|
|
|
+ 'BaseRequest': self.base_request,
|
|
|
+ 'Msg': {
|
|
|
+ 'Type': 6,
|
|
|
+ 'Content': ("<appmsg appid='wxeb7ec651dd0aefa9' sdkver=''><title>%s</title><des></des><action></action><type>6</type><content></content><url></url><lowurl></lowurl><appattach><totallen>%s</totallen><attachid>%s</attachid><fileext>%s</fileext></appattach><extinfo></extinfo></appmsg>" % (os.path.basename(fpath).encode('utf-8'), str(os.path.getsize(fpath)), mid, fpath.split('.')[-1])).encode('utf8'),
|
|
|
+ 'FromUserName': self.my_account['UserName'],
|
|
|
+ 'ToUserName': uid,
|
|
|
+ 'LocalID': msg_id,
|
|
|
+ 'ClientMsgId': msg_id, }, }
|
|
|
+ try:
|
|
|
+ r = self.session.post(url, data=json.dumps(data))
|
|
|
+ res = json.loads(r.text)
|
|
|
+ if res['BaseResponse']['Ret'] == 0:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+ except Exception,e:
|
|
|
+ return False
|
|
|
+
|
|
|
+ def send_img_msg_by_uid(self, fpath, uid):
|
|
|
+ mid = self.upload_media(fpath, is_img=True)
|
|
|
+ if mid is None:
|
|
|
+ return False
|
|
|
+ url = self.base_uri + '/webwxsendmsgimg?fun=async&f=json'
|
|
|
+ data = {
|
|
|
+ 'BaseRequest': self.base_request,
|
|
|
+ 'Msg': {
|
|
|
+ 'Type': 3,
|
|
|
+ 'MediaId': mid,
|
|
|
+ 'FromUserName': self.my_account['UserName'],
|
|
|
+ 'ToUserName': uid,
|
|
|
+ 'LocalID': str(time.time() * 1e7),
|
|
|
+ 'ClientMsgId': str(time.time() * 1e7), }, }
|
|
|
+ if fpath[-4:] == '.gif':
|
|
|
+ url = self.base_uri + '/webwxsendemoticon?fun=sys'
|
|
|
+ data['Msg']['Type'] = 47
|
|
|
+ data['Msg']['EmojiFlag'] = 2
|
|
|
+ try:
|
|
|
+ r = self.session.post(url, data=json.dumps(data))
|
|
|
+ res = json.loads(r.text)
|
|
|
+ if res['BaseResponse']['Ret'] == 0:
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ return False
|
|
|
+ except Exception,e:
|
|
|
+ return False
|
|
|
+
|
|
|
def get_user_id(self, name):
|
|
|
if name == '':
|
|
|
return None
|