#!/usr/bin/env python # coding: utf-8 import os import sys import traceback import webbrowser import pyqrcode import requests import mimetypes import json import xml.dom.minidom import urllib import time import re import random from traceback import format_exc from requests.exceptions import ConnectionError, ReadTimeout from Queue import Queue import HTMLParser import threading import pickle UNKONWN = 'unkonwn' SUCCESS = '200' SCANED = '201' TIMEOUT = '408' def show_image(file_path): """ 跨平台显示图片文件 :param file_path: 图片文件路径 """ if sys.version_info >= (3, 3): from shlex import quote else: from pipes import quote if sys.platform == "darwin": command = "open -a /Applications/Preview.app %s&" % quote(file_path) os.system(command) else: webbrowser.open(os.path.join(os.getcwd(),'temp',file_path)) class SafeSession(requests.Session): def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None, timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None, json=None): for i in range(3): try: return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json) except Exception as e: print e.message, traceback.format_exc() continue class WXBot: """WXBot功能类""" def __init__(self): self.DEBUG = False self.SCHEDULE_INTV = 5 self.uuid = '' self.base_uri = '' self.redirect_uri = '' self.uin = '' self.sid = '' self.skey = '' self.pass_ticket = '' self.device_id = 'e' + repr(random.random())[2:17] self.base_request = {} self.sync_key_str = '' self.sync_key = [] self.sync_host = '' #文件缓存目录 self.temp_pwd = os.path.join(os.getcwd(), 'temp') if os.path.exists(self.temp_pwd) == False: os.makedirs(self.temp_pwd) self.session = SafeSession() self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5'}) self.conf = {'qr': 'png'} self.my_account = {} # 当前账户 # 所有相关账号: 联系人, 公众号, 群组, 特殊账号 self.member_list = [] # 所有群组的成员, {'group_id1': [member1, member2, ...], ...} self.group_members = {} # 所有账户, {'group_member':{'id':{'type':'group_member', 'info':{}}, ...}, 'normal_member':{'id':{}, ...}} self.account_info = {'group_member': {}, 'normal_member': {}} self.contact_list = [] # 联系人列表 self.public_list = [] # 公众账号列表 self.group_list = [] # 群聊列表 self.special_list = [] # 特殊账号列表 self.encry_chat_room_id_list = [] # 存储群聊的EncryChatRoomId,获取群内成员头像时需要用到 self.file_index = 0 # 文件上传序号 self.msg_queue = Queue() # 消息处理队列,handle_msg_all是从此队列拿消息的 self.msg_thread = None self.schedule_thread = None self.inner_proc_thread = None @staticmethod def to_unicode(string, encoding='utf-8'): """ 将字符串转换为Unicode :param string: 待转换字符串 :param encoding: 字符串解码方式 :return: 转换后的Unicode字符串 """ if isinstance(string, str): return string.decode(encoding) elif isinstance(string, unicode): return string else: raise Exception('Unknown Type') def get_contact(self): """获取当前账户的所有相关账号(包括联系人、公众号、群聊、特殊账号)""" url = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' \ % (self.pass_ticket, self.skey, int(time.time())) r = self.session.post(url, data='{}') r.encoding = 'utf-8' if self.DEBUG: with open(os.path.join(self.temp_pwd,'contacts.json'), 'w') as f: f.write(r.text.encode('utf-8')) dic = json.loads(r.text) self.member_list = dic['MemberList'] special_users = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail', 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle', 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp', 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp', 'feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder', 'weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages'] self.contact_list = [] self.public_list = [] self.special_list = [] self.group_list = [] for contact in self.member_list: if contact['VerifyFlag'] & 8 != 0: # 公众号 self.public_list.append(contact) self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact} elif contact['UserName'] in special_users: # 特殊账户 self.special_list.append(contact) self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact} elif contact['UserName'].find('@@') != -1: # 群聊 self.group_list.append(contact) self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact} elif contact['UserName'] == self.my_account['UserName']: # 自己 self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact} else: self.contact_list.append(contact) self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact} self.batch_get_group_members() for group in self.group_members: for member in self.group_members[group]: if member['UserName'] not in self.account_info: self.account_info['group_member'][member['UserName']] = \ {'type': 'group_member', 'info': member, 'group': group} return True def batch_get_group_members(self): """批量获取所有群聊成员信息""" url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket) params = { 'BaseRequest': self.base_request, "Count": len(self.group_list), "List": [{"UserName": group['UserName'], "EncryChatRoomId": ""} for group in self.group_list] } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) group_members = {} encry_chat_room_id = {} for group in dic['ContactList']: gid = group['UserName'] members = group['MemberList'] group_members[gid] = members encry_chat_room_id[gid] = group['EncryChatRoomId'] self.group_members = group_members self.encry_chat_room_id_list = encry_chat_room_id def get_group_member_name(self, gid, uid): """ 获取群聊中指定成员的名称信息 :param gid: 群id :param uid: 群聊成员id :return: 名称信息,类似 {"display_name": "test_user", "nickname": "test", "remark_name": "for_test" } """ if gid not in self.group_members: return None group = self.group_members[gid] for member in group: if member['UserName'] == uid: names = {} if 'RemarkName' in member and member['RemarkName']: names['remark_name'] = member['RemarkName'] if 'NickName' in member and member['NickName']: names['nickname'] = member['NickName'] if 'DisplayName' in member and member['DisplayName']: names['display_name'] = member['DisplayName'] return names return None def get_contact_info(self, uid): return self.account_info['normal_member'].get(uid) def get_group_member_info(self, uid): return self.account_info['group_member'].get(uid) def get_contact_name(self, uid): info = self.get_contact_info(uid) if info is None: return None info = info['info'] name = {} if 'RemarkName' in info and info['RemarkName']: name['remark_name'] = info['RemarkName'] if 'NickName' in info and info['NickName']: name['nickname'] = info['NickName'] if 'DisplayName' in info and info['DisplayName']: name['display_name'] = info['DisplayName'] if len(name) == 0: return None else: return name @staticmethod def get_contact_prefer_name(name): if name is None: return None if 'remark_name' in name: return name['remark_name'] if 'nickname' in name: return name['nickname'] if 'display_name' in name: return name['display_name'] return None @staticmethod def get_group_member_prefer_name(name): if name is None: return None if 'remark_name' in name: return name['remark_name'] if 'display_name' in name: return name['display_name'] if 'nickname' in name: return name['nickname'] return None def get_user_type(self, wx_user_id): """ 获取特定账号与自己的关系 :param wx_user_id: 账号id: :return: 与当前账号的关系 """ for account in self.contact_list: if wx_user_id == account['UserName']: return 'contact' for account in self.public_list: if wx_user_id == account['UserName']: return 'public' for account in self.special_list: if wx_user_id == account['UserName']: return 'special' for account in self.group_list: if wx_user_id == account['UserName']: return 'group' for group in self.group_members: for member in self.group_members[group]: if member['UserName'] == wx_user_id: return 'group_member' return 'unknown' def is_contact(self, uid): for account in self.contact_list: if uid == account['UserName']: return True return False def is_public(self, uid): for account in self.public_list: if uid == account['UserName']: return True return False def is_special(self, uid): for account in self.special_list: if uid == account['UserName']: return True return False def handle_msg_all(self, msg): """ 处理所有消息,请子类化后覆盖此函数 msg: msg_id -> 消息id msg_type_id -> 消息类型id user -> 发送消息的账号id content -> 消息内容 :param msg: 收到的消息 """ pass @staticmethod def proc_at_info(msg): if not msg: return '', [] segs = msg.split(u'\u2005') str_msg_all = '' str_msg = '' infos = [] if len(segs) > 1: for i in range(0, len(segs) - 1): segs[i] += u'\u2005' pm = re.search(u'@.*\u2005', segs[i]).group() if pm: name = pm[1:-1] string = segs[i].replace(pm, '') str_msg_all += string + '@' + name + ' ' str_msg += string if string: infos.append({'type': 'str', 'value': string}) infos.append({'type': 'at', 'value': name}) else: infos.append({'type': 'str', 'value': segs[i]}) str_msg_all += segs[i] str_msg += segs[i] str_msg_all += segs[-1] str_msg += segs[-1] infos.append({'type': 'str', 'value': segs[-1]}) else: infos.append({'type': 'str', 'value': segs[-1]}) str_msg_all = msg str_msg = msg return str_msg_all.replace(u'\u2005', ''), str_msg.replace(u'\u2005', ''), infos def extract_msg_content(self, msg_type_id, msg): """ content_type_id: 0 -> Text 1 -> Location 3 -> Image 4 -> Voice 5 -> Recommend 6 -> Animation 7 -> Share 8 -> Video 9 -> VideoCall 10 -> Redraw 11 -> Empty 99 -> Unknown :param msg_type_id: 消息类型id :param msg: 消息结构体 :return: 解析的消息 """ mtype = msg['MsgType'] content = HTMLParser.HTMLParser().unescape(msg['Content']) msg_id = msg['MsgId'] msg_content = {} if msg_type_id == 0: return {'type': 11, 'data': ''} elif msg_type_id == 2: # File Helper return {'type': 0, 'data': content.replace('
', '\n')} elif msg_type_id == 3: # 群聊 sp = content.find('
') uid = content[:sp] content = content[sp:] content = content.replace('
', '') uid = uid[:-1] name = self.get_contact_prefer_name(self.get_contact_name(uid)) if not name: name = self.get_group_member_prefer_name(self.get_group_member_name(msg['FromUserName'], uid)) if not name: name = 'unknown' msg_content['user'] = {'id': uid, 'name': name} else: # Self, Contact, Special, Public, Unknown pass msg_prefix = (msg_content['user']['name'] + ':') if 'user' in msg_content else '' if mtype == 1: if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1: r = self.session.get(content) r.encoding = 'gbk' data = r.text pos = self.search_content('title', data, 'xml') msg_content['type'] = 1 msg_content['data'] = pos msg_content['detail'] = data if self.DEBUG: print ' %s[Location] %s ' % (msg_prefix, pos) else: msg_content['type'] = 0 if msg_type_id == 3 or (msg_type_id == 1 and msg['ToUserName'][:2] == '@@'): # Group text message msg_infos = self.proc_at_info(content) str_msg_all = msg_infos[0] str_msg = msg_infos[1] detail = msg_infos[2] msg_content['data'] = str_msg_all msg_content['detail'] = detail msg_content['desc'] = str_msg else: msg_content['data'] = content if self.DEBUG: try: print ' %s[Text] %s' % (msg_prefix, msg_content['data']) except UnicodeEncodeError: print ' %s[Text] (illegal text).' % msg_prefix elif mtype == 3: msg_content['type'] = 3 msg_content['data'] = self.get_msg_img_url(msg_id) msg_content['img'] = self.session.get(msg_content['data']).content.encode('hex') if self.DEBUG: image = self.get_msg_img(msg_id) print ' %s[Image] %s' % (msg_prefix, image) elif mtype == 34: msg_content['type'] = 4 msg_content['data'] = self.get_voice_url(msg_id) msg_content['voice'] = self.session.get(msg_content['data']).content.encode('hex') if self.DEBUG: voice = self.get_voice(msg_id) print ' %s[Voice] %s' % (msg_prefix, voice) elif mtype == 37: msg_content['type'] = 37 msg_content['data'] = msg['RecommendInfo'] if self.DEBUG: print ' %s[useradd] %s' % (msg_prefix,msg['RecommendInfo']['NickName']) elif mtype == 42: msg_content['type'] = 5 info = msg['RecommendInfo'] msg_content['data'] = {'nickname': info['NickName'], 'alias': info['Alias'], 'province': info['Province'], 'city': info['City'], 'gender': ['unknown', 'male', 'female'][info['Sex']]} if self.DEBUG: print ' %s[Recommend]' % msg_prefix print ' -----------------------------' print ' | NickName: %s' % info['NickName'] print ' | Alias: %s' % info['Alias'] print ' | Local: %s %s' % (info['Province'], info['City']) print ' | Gender: %s' % ['unknown', 'male', 'female'][info['Sex']] print ' -----------------------------' elif mtype == 47: msg_content['type'] = 6 msg_content['data'] = self.search_content('cdnurl', content) if self.DEBUG: print ' %s[Animation] %s' % (msg_prefix, msg_content['data']) elif mtype == 49: msg_content['type'] = 7 if msg['AppMsgType'] == 3: app_msg_type = 'music' elif msg['AppMsgType'] == 5: app_msg_type = 'link' elif msg['AppMsgType'] == 7: app_msg_type = 'weibo' else: app_msg_type = 'unknown' msg_content['data'] = {'type': app_msg_type, 'title': msg['FileName'], 'desc': self.search_content('des', content, 'xml'), 'url': msg['Url'], 'from': self.search_content('appname', content, 'xml'), 'content': msg.get('Content') # 有的公众号会发一次性3 4条链接一个大图,如果只url那只能获取第一条,content里面有所有的链接 } if self.DEBUG: print ' %s[Share] %s' % (msg_prefix, app_msg_type) print ' --------------------------' print ' | title: %s' % msg['FileName'] print ' | desc: %s' % self.search_content('des', content, 'xml') print ' | link: %s' % msg['Url'] print ' | from: %s' % self.search_content('appname', content, 'xml') print ' | content: %s' % (msg.get('content')[:20] if msg.get('content') else "unknown") print ' --------------------------' elif mtype == 62: msg_content['type'] = 8 msg_content['data'] = content if self.DEBUG: print ' %s[Video] Please check on mobiles' % msg_prefix elif mtype == 53: msg_content['type'] = 9 msg_content['data'] = content if self.DEBUG: print ' %s[Video Call]' % msg_prefix elif mtype == 10002: msg_content['type'] = 10 msg_content['data'] = content if self.DEBUG: print ' %s[Redraw]' % msg_prefix elif mtype == 10000: # unknown, maybe red packet, or group invite msg_content['type'] = 12 msg_content['data'] = msg['Content'] if self.DEBUG: print ' [Unknown]' else: msg_content['type'] = 99 msg_content['data'] = content if self.DEBUG: print ' %s[Unknown]' % msg_prefix return msg_content def handle_msg(self, r): """ 处理原始微信消息的内部函数 msg_type_id: 0 -> Init 1 -> Self 2 -> FileHelper 3 -> Group 4 -> Contact 5 -> Public 6 -> Special 99 -> Unknown :param r: 原始微信消息 """ for msg in r['AddMsgList']: user = {'id': msg['FromUserName'], 'name': 'unknown'} if msg['MsgType'] == 51: # init message msg_type_id = 0 user['name'] = 'system' elif msg['MsgType'] == 37: # friend request msg_type_id = 37 pass # content = msg['Content'] # username = content[content.index('fromusername='): content.index('encryptusername')] # username = username[username.index('"') + 1: username.rindex('"')] # print u'[Friend Request]' # print u' Nickname:' + msg['RecommendInfo']['NickName'] # print u' 附加消息:'+msg['RecommendInfo']['Content'] # # print u'Ticket:'+msg['RecommendInfo']['Ticket'] # Ticket添加好友时要用 # print u' 微信号:'+username #未设置微信号的 腾讯会自动生成一段微信ID 但是无法通过搜索 搜索到此人 elif msg['FromUserName'] == self.my_account['UserName']: # Self msg_type_id = 1 user['name'] = 'self' elif msg['ToUserName'] == 'filehelper': # File Helper msg_type_id = 2 user['name'] = 'file_helper' elif msg['FromUserName'][:2] == '@@': # Group msg_type_id = 3 user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id'])) elif self.is_contact(msg['FromUserName']): # Contact msg_type_id = 4 user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id'])) elif self.is_public(msg['FromUserName']): # Public msg_type_id = 5 user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id'])) elif self.is_special(msg['FromUserName']): # Special msg_type_id = 6 user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id'])) else: msg_type_id = 99 user['name'] = 'unknown' if not user['name']: user['name'] = 'unknown' user['name'] = HTMLParser.HTMLParser().unescape(user['name']) if self.DEBUG and msg_type_id != 0: print u'[MSG] %s:' % user['name'] content = self.extract_msg_content(msg_type_id, msg) message = {'msg_type_id': msg_type_id, 'msg_id': msg['MsgId'], 'content': content, 'to_user_id': msg['ToUserName'], 'user': user} self.msg_queue.put(message) def schedule(self): """ 做任务型事情的函数,如果需要,可以在子类中覆盖此函数 此函数在处理消息的间隙被调用,请不要长时间阻塞此函数 """ pass def proc_msg(self): while True: check_time = time.time() try: [retcode, selector] = self.sync_check() # print '[DEBUG] sync_check:', retcode, selector if retcode == '1100': # 从微信客户端上登出 break elif retcode == '1101': # 从其它设备上登了网页微信 break elif retcode == '0': if selector == '2': # 有新消息 r = self.sync() if r is not None: self.handle_msg(r) elif selector == '3': # 未知 r = self.sync() if r is not None: self.handle_msg(r) elif selector == '4': # 通讯录更新 r = self.sync() if r is not None: self.get_contact() elif selector == '6': # 可能是红包 r = self.sync() if r is not None: self.handle_msg(r) elif selector == '7': # 在手机上操作了微信 r = self.sync() if r is not None: self.handle_msg(r) elif selector == '0': # 无事件 pass else: print '[DEBUG] sync_check:', retcode, selector r = self.sync() if r is not None: self.handle_msg(r) else: print '[DEBUG] sync_check:', retcode, selector except: print '[ERROR] Except in proc_msg' print format_exc() check_time = time.time() - check_time if check_time < 0.8: time.sleep(1 - check_time) def msg_thread_proc(self): print '[INFO] Msg thread start' while True: if not self.msg_queue.empty(): msg = self.msg_queue.get() self.handle_msg_all(msg) else: time.sleep(0.1) def schedule_thread_proc(self): print '[INFO] Schedule thread start' while True: check_time = time.time() self.schedule() check_time = time.time() - check_time if check_time < self.SCHEDULE_INTV: time.sleep(self.SCHEDULE_INTV - check_time) def login_and_init_with_restore(self): return self.restore_login_result() def login_and_init_with_qr(self): self.get_uuid() self.gen_qr_code(os.path.join(self.temp_pwd, 'wxqr.png')) print '[INFO] Please use WeChat to scan the QR code .' result = self.wait4login() if result != SUCCESS: print '[ERROR] Web WeChat login failed. failed code=%s' % (result,) return False if self.login(): print '[INFO] Web WeChat login succeed .' else: print '[ERROR] Web WeChat login failed .' return False if self.init(): print '[INFO] Web WeChat init succeed .' else: print '[INFO] Web WeChat init failed' return False self.status_notify() self.get_contact() self.test_sync_check() self.save_login_result() return True def run_inner(self): print '[INFO] Get %d contacts' % len(self.contact_list) print '[INFO] Start to process messages .' self.proc_msg() def run(self): if not self.login_and_init_with_restore(): print '[INFO] Restore login failed !' if not self.login_and_init_with_qr(): print '[ERROR] Login and init failed !' return else: print '[INFO Restore login succeed .' self.msg_thread = threading.Thread(target=self.msg_thread_proc) self.msg_thread.setDaemon(True) self.msg_thread.start() self.schedule_thread = threading.Thread(target=self.schedule_thread_proc) self.schedule_thread.setDaemon(True) self.schedule_thread.start() self.inner_proc_thread = threading.Thread(target=self.run_inner) self.inner_proc_thread.setDaemon(True) self.inner_proc_thread.start() self.inner_proc_thread.join() while True: if self.login_and_init_with_restore(): self.inner_proc_thread = threading.Thread(target=self.run_inner) self.inner_proc_thread.setDaemon(True) self.inner_proc_thread.start() self.inner_proc_thread.join() else: print '[ERROR] Try to restore from file failed !' return def apply_useradd_requests(self,RecommendInfo): url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN' params = { "BaseRequest": self.base_request, "Opcode": 3, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": RecommendInfo['UserName'], "VerifyUserTicket": RecommendInfo['Ticket'] } ], "VerifyContent": "", "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0 def add_groupuser_to_friend_by_uid(self, uid, VerifyContent): """ 主动向群内人员打招呼,提交添加好友请求 uid-群内人员得uid VerifyContent-好友招呼内容 慎用此接口!封号后果自负!慎用此接口!封号后果自负!慎用此接口!封号后果自负! """ if self.is_contact(uid): return True url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN' params ={ "BaseRequest": self.base_request, "Opcode": 2, "VerifyUserListSize": 1, "VerifyUserList": [ { "Value": uid, "VerifyUserTicket": "" } ], "VerifyContent": VerifyContent, "SceneListCount": 1, "SceneList": [ 33 ], "skey": self.skey } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0 def add_friend_to_group(self,uid,group_name): """ 将好友加入到群聊中 """ gid = '' #通过群名获取群id,群没保存到通讯录中的话无法添加哦 for group in self.group_list: if group['NickName'] == group_name: gid = group['UserName'] if gid == '': return False #通过群id判断uid是否在群中 for user in self.group_members[gid]: if user['UserName'] == uid: #已经在群里面了,不用加了 return True url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticket params ={ "AddMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0 def delete_user_from_group(self,uname,gid): """ 将群用户从群中剔除,只有群管理员有权限 """ uid = "" for user in self.group_members[gid]: if user['NickName'] == uname: uid = user['UserName'] if uid == "": return False url = self.base_uri + '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % self.pass_ticket params ={ "DelMemberList": uid, "ChatRoomName": gid, "BaseRequest": self.base_request } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0 def send_msg_by_uid(self, word, dst='filehelper'): url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticket msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '') word = self.to_unicode(word) params = { 'BaseRequest': self.base_request, 'Msg': { "Type": 1, "Content": word, "FromUserName": self.my_account['UserName'], "ToUserName": dst, "LocalID": msg_id, "ClientMsgId": msg_id } } headers = {'content-type': 'application/json; charset=UTF-8'} data = json.dumps(params, ensure_ascii=False).encode('utf8') try: r = self.session.post(url, data=data, headers=headers) except (ConnectionError, ReadTimeout): return False dic = r.json() return dic['BaseResponse']['Ret'] == 0 def upload_media(self, fpath, is_img=False): if not os.path.exists(fpath): print '[ERROR] File not exists.' return None url_1 = 'https://file.wx2.qq.com/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json' url_2 = 'https://file2.wx2.qq.com/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json' flen = str(os.path.getsize(fpath)) ftype = mimetypes.guess_type(fpath)[0] or 'application/octet-stream' files = { 'id': (None, 'WU_FILE_%s' % str(self.file_index)), 'name': (None, os.path.basename(fpath)), 'type': (None, ftype), 'lastModifiedDate': (None, time.strftime('%m/%d/%Y, %H:%M:%S GMT+0800 (CST)')), 'size': (None, flen), 'mediatype': (None, 'pic' if is_img else 'doc'), 'uploadmediarequest': (None, json.dumps({ 'BaseRequest': self.base_request, 'ClientMediaId': int(time.time()), 'TotalLen': flen, 'StartPos': 0, 'DataLen': flen, 'MediaType': 4, })), 'webwx_data_ticket': (None, self.session.cookies['webwx_data_ticket']), 'pass_ticket': (None, self.pass_ticket), 'filename': (os.path.basename(fpath), open(fpath, 'rb'),ftype.split('/')[1]), } self.file_index += 1 try: r = self.session.post(url_1, files=files) if json.loads(r.text)['BaseResponse']['Ret'] != 0: # 当file返回值不为0时则为上传失败,尝试第二服务器上传 r = self.session.post(url_2, files=files) if json.loads(r.text)['BaseResponse']['Ret'] != 0: print '[ERROR] Upload media failure.' return None mid = json.loads(r.text)['MediaId'] return mid except Exception,e: return None def send_file_msg_by_uid(self, fpath, uid): mid = self.upload_media(fpath) if mid is None or not mid: return False url = self.base_uri + '/webwxsendappmsg?fun=async&f=json&pass_ticket=' + self.pass_ticket msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '') data = { 'BaseRequest': self.base_request, 'Msg': { 'Type': 6, 'Content': ("%s6%s%s%s" % (os.path.basename(fpath).encode('utf-8'), str(os.path.getsize(fpath)), mid, fpath.split('.')[-1])).encode('utf8'), 'FromUserName': self.my_account['UserName'], 'ToUserName': uid, 'LocalID': msg_id, 'ClientMsgId': msg_id, }, } try: r = self.session.post(url, data=json.dumps(data)) res = json.loads(r.text) if res['BaseResponse']['Ret'] == 0: return True else: return False except Exception,e: return False def send_img_msg_by_uid(self, fpath, uid): mid = self.upload_media(fpath, is_img=True) if mid is None: return False url = self.base_uri + '/webwxsendmsgimg?fun=async&f=json' data = { 'BaseRequest': self.base_request, 'Msg': { 'Type': 3, 'MediaId': mid, 'FromUserName': self.my_account['UserName'], 'ToUserName': uid, 'LocalID': str(time.time() * 1e7), 'ClientMsgId': str(time.time() * 1e7), }, } if fpath[-4:] == '.gif': url = self.base_uri + '/webwxsendemoticon?fun=sys' data['Msg']['Type'] = 47 data['Msg']['EmojiFlag'] = 2 try: r = self.session.post(url, data=json.dumps(data)) res = json.loads(r.text) if res['BaseResponse']['Ret'] == 0: return True else: return False except Exception,e: return False def get_user_id(self, name): if name == '': return None name = self.to_unicode(name) for contact in self.contact_list: if 'RemarkName' in contact and contact['RemarkName'] == name: return contact['UserName'] elif 'NickName' in contact and contact['NickName'] == name: return contact['UserName'] elif 'DisplayName' in contact and contact['DisplayName'] == name: return contact['UserName'] for group in self.group_list: if 'RemarkName' in group and group['RemarkName'] == name: return group['UserName'] if 'NickName' in group and group['NickName'] == name: return group['UserName'] if 'DisplayName' in group and group['DisplayName'] == name: return group['UserName'] return '' def send_msg(self, name, word, isfile=False): uid = self.get_user_id(name) if uid is not None: if isfile: with open(word, 'r') as f: result = True for line in f.readlines(): line = line.replace('\n', '') print '-> ' + name + ': ' + line if self.send_msg_by_uid(line, uid): pass else: result = False time.sleep(1) return result else: word = self.to_unicode(word) if self.send_msg_by_uid(word, uid): return True else: return False else: if self.DEBUG: print '[ERROR] This user does not exist .' return True @staticmethod def search_content(key, content, fmat='attr'): if fmat == 'attr': pm = re.search(key + '\s?=\s?"([^"<]+)"', content) if pm: return pm.group(1) elif fmat == 'xml': pm = re.search('<{0}>([^<]+)'.format(key), content) if pm: return pm.group(1) return 'unknown' def get_uuid(self): url = 'https://login.weixin.qq.com/jslogin' params = { 'appid': 'wx782c26e4c19acffb', 'fun': 'new', 'lang': 'zh_CN', '_': int(time.time()) * 1000 + random.randint(1, 999), } r = self.session.get(url, params=params) r.encoding = 'utf-8' data = r.text regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"' pm = re.search(regx, data) if pm: code = pm.group(1) self.uuid = pm.group(2) return code == '200' return False def gen_qr_code(self, qr_file_path): string = 'https://login.weixin.qq.com/l/' + self.uuid qr = pyqrcode.create(string) if self.conf['qr'] == 'png': qr.png(qr_file_path, scale=8) show_image(qr_file_path) # img = Image.open(qr_file_path) # img.show() elif self.conf['qr'] == 'tty': print(qr.terminal(quiet_zone=1)) def do_request(self, url): r = self.session.get(url) r.encoding = 'utf-8' data = r.text param = re.search(r'window.code=(\d+);', data) code = param.group(1) return code, data def wait4login(self): """ http comet: tip=1, 等待用户扫描二维码, 201: scaned 408: timeout tip=0, 等待用户确认登录, 200: confirmed """ LOGIN_TEMPLATE = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' tip = 1 try_later_secs = 1 MAX_RETRY_TIMES = 10 code = UNKONWN retry_time = MAX_RETRY_TIMES while retry_time > 0: url = LOGIN_TEMPLATE % (tip, self.uuid, int(time.time())) code, data = self.do_request(url) if code == SCANED: print '[INFO] Please confirm to login .' tip = 0 elif code == SUCCESS: # 确认登录成功 param = re.search(r'window.redirect_uri="(\S+?)";', data) redirect_uri = param.group(1) + '&fun=new' self.redirect_uri = redirect_uri self.base_uri = redirect_uri[:redirect_uri.rfind('/')] return code elif code == TIMEOUT: print '[ERROR] WeChat login timeout. retry in %s secs later...' % (try_later_secs,) tip = 1 # 重置 retry_time -= 1 time.sleep(try_later_secs) else: print ('[ERROR] WeChat login exception return_code=%s. retry in %s secs later...' % (code, try_later_secs)) tip = 1 retry_time -= 1 time.sleep(try_later_secs) return code def save_dict_to_file(self, dic, fname): with open(os.path.join(self.temp_pwd, fname), 'w') as f: f.write(json.dumps(dic)) def restore_dict_from_file(self, fname): with open(os.path.join(self.temp_pwd, fname), 'r') as f: fstr = f.read() return json.loads(fstr) def save_login_result(self): result = {} result['skey'] = self.skey result['sid'] = self.sid result['uin'] = self.uin result['pass_ticket'] = self.pass_ticket result['base_request'] = self.base_request result['redirect_uri'] = self.redirect_uri result['base_uri'] = self.base_uri result['sync_key'] = self.sync_key result['sync_key_str'] = self.sync_key_str result['my_account'] = self.my_account result['sync_host'] = self.sync_host with open(os.path.join(self.temp_pwd, "login_result.json"), 'w') as f: f.write(json.dumps(result)) self.save_dict_to_file(self.contact_list, 'contact_list.json') self.save_dict_to_file(self.special_list, 'special_list.json') self.save_dict_to_file(self.group_list, 'group_list.json') self.save_dict_to_file(self.public_list, 'public_list.json') self.save_dict_to_file(self.member_list, 'member_list.json') self.save_dict_to_file(self.group_members, 'group_members.json') self.save_dict_to_file(self.account_info, 'account_info.json') self.save_dict_to_file(self.encry_chat_room_id_list, 'encry_chat_room_id_list.json') pickle.dump(self.session, open(os.path.join(self.temp_pwd, "session.json"), "w")) def restore_login_result(self): try: with open(os.path.join(self.temp_pwd, "login_result.json"), 'r') as f: login_str = f.read() result = json.loads(login_str) self.skey = result['skey'] self.sid = result['sid'] self.uin = result['uin'] self.pass_ticket = result['pass_ticket'] self.base_request = result['base_request'] self.redirect_uri = result['redirect_uri'] self.base_uri = result['base_uri'] self.sync_key = result['sync_key'] self.sync_key_str = result['sync_key_str'] self.my_account = result['my_account'] self.sync_host = result['sync_host'] self.contact_list = self.restore_dict_from_file('contact_list.json') self.special_list = self.restore_dict_from_file('special_list.json') self.group_list = self.restore_dict_from_file('group_list.json') self.public_list = self.restore_dict_from_file('public_list.json') self.member_list = self.restore_dict_from_file('member_list.json') self.group_members = self.restore_dict_from_file('group_members.json') self.account_info = self.restore_dict_from_file('account_info.json') self.encry_chat_room_id_list = self.restore_dict_from_file('encry_chat_room_id_list.json') self.session = pickle.load(open(os.path.join(self.temp_pwd, "session.json"), "r")) return True except Exception, e: print format_exc() def login(self): if len(self.redirect_uri) < 4: print '[ERROR] Login failed due to network problem, please try again.' return False r = self.session.get(self.redirect_uri) r.encoding = 'utf-8' data = r.text doc = xml.dom.minidom.parseString(data) root = doc.documentElement for node in root.childNodes: if node.nodeName == 'skey': self.skey = node.childNodes[0].data elif node.nodeName == 'wxsid': self.sid = node.childNodes[0].data elif node.nodeName == 'wxuin': self.uin = node.childNodes[0].data elif node.nodeName == 'pass_ticket': self.pass_ticket = node.childNodes[0].data if '' in (self.skey, self.sid, self.uin, self.pass_ticket): return False self.base_request = { 'Uin': self.uin, 'Sid': self.sid, 'Skey': self.skey, 'DeviceID': self.device_id, } return True def init(self): url = self.base_uri + '/webwxinit?r=%i&lang=en_US&pass_ticket=%s' % (int(time.time()), self.pass_ticket) params = { 'BaseRequest': self.base_request } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) self.sync_key = dic['SyncKey'] self.my_account = dic['User'] self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.sync_key['List']]) return dic['BaseResponse']['Ret'] == 0 def status_notify(self): url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticket self.base_request['Uin'] = int(self.base_request['Uin']) params = { 'BaseRequest': self.base_request, "Code": 3, "FromUserName": self.my_account['UserName'], "ToUserName": self.my_account['UserName'], "ClientMsgId": int(time.time()) } r = self.session.post(url, data=json.dumps(params)) r.encoding = 'utf-8' dic = json.loads(r.text) return dic['BaseResponse']['Ret'] == 0 def test_sync_check(self): for host in ['webpush.wx', 'webpush2.wx', 'webpush.weixin', 'webpush2.weixin2', ]: self.sync_host = host retcode = self.sync_check()[0] if retcode == '0': return True return False def sync_check(self): params = { 'r': int(time.time()), 'sid': self.sid, 'uin': self.uin, 'skey': self.skey, 'deviceid': self.device_id, 'synckey': self.sync_key_str, '_': int(time.time()), } url = 'https://' + self.sync_host + '.qq.com/cgi-bin/mmwebwx-bin/synccheck?' + urllib.urlencode(params) try: r = self.session.get(url, timeout=60) r.encoding = 'utf-8' data = r.text pm = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', data) retcode = pm.group(1) selector = pm.group(2) return [retcode, selector] except: return [-1, -1] def sync(self): url = self.base_uri + '/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s' \ % (self.sid, self.skey, self.pass_ticket) params = { 'BaseRequest': self.base_request, 'SyncKey': self.sync_key, 'rr': ~int(time.time()) } try: r = self.session.post(url, data=json.dumps(params), timeout=60) r.encoding = 'utf-8' dic = json.loads(r.text) if dic['BaseResponse']['Ret'] == 0: self.sync_key = dic['SyncKey'] self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val']) for keyVal in self.sync_key['List']]) return dic except: return None def get_icon(self, uid, gid=None): """ 获取联系人或者群聊成员头像 :param uid: 联系人id :param gid: 群id,如果为非None获取群中成员头像,如果为None则获取联系人头像 """ if gid is None: url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey) else: url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % ( uid, self.skey, self.encry_chat_room_id_list[gid]) r = self.session.get(url) data = r.content fn = 'icon_' + uid + '.jpg' with open(os.path.join(self.temp_pwd,fn), 'wb') as f: f.write(data) return fn def get_head_img(self, uid): """ 获取群头像 :param uid: 群uid """ url = self.base_uri + '/webwxgetheadimg?username=%s&skey=%s' % (uid, self.skey) r = self.session.get(url) data = r.content fn = 'head_' + uid + '.jpg' with open(os.path.join(self.temp_pwd,fn), 'wb') as f: f.write(data) return fn def get_msg_img_url(self, msgid): return self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey) def get_msg_img(self, msgid): """ 获取图片消息,下载图片到本地 :param msgid: 消息id :return: 保存的本地图片文件路径 """ url = self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey) r = self.session.get(url) data = r.content fn = 'img_' + msgid + '.jpg' with open(os.path.join(self.temp_pwd,fn), 'wb') as f: f.write(data) return fn def get_voice_url(self, msgid): return self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey) def get_voice(self, msgid): """ 获取语音消息,下载语音到本地 :param msgid: 语音消息id :return: 保存的本地语音文件路径 """ url = self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey) r = self.session.get(url) data = r.content fn = 'voice_' + msgid + '.mp3' with open(os.path.join(self.temp_pwd,fn), 'wb') as f: f.write(data) return fn def set_remark_name(self, uid, name): # 设置联系人的备注名 url = self.base_uri + '/webwxoplog?lang=zh_CN&pass_ticket=%s' % self.pass_ticket remark_name = self.to_unicode(name) params = { 'BaseRequest': self.base_request, 'CmdId': 2, 'RemarkName': remark_name, 'UserName': uid } try: r = self.session.post(url, data=json.dumps(params), timeout=60) r.encoding = 'utf-8' dic = json.loads(r.text) return dic['BaseResponse']['ErrMsg'] except: return None