12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478 |
- #!/usr/bin/env python
- # coding: utf-8
- from __future__ import print_function
- import os
- import tempfile
- import webbrowser
- import binascii
- import pyqrcode
- import requests
- import mimetypes
- import json
- import xml.dom.minidom
- import time
- import re
- import random
- from requests.exceptions import ConnectionError, ReadTimeout
- import sys
- if sys.version_info[0] == 3:
- from html import parser as HTMLParser
- from urllib.parse import urlencode
- unicode = str
- else:
- import HTMLParser
- from urllib import urlencode
- import logging
- log = logging.getLogger('wxbot')
- UNKONWN = 'unkonwn'
- SUCCESS = '200'
- SCANED = '201'
- TIMEOUT = '408'
- def map_username_batch(user_name):
- return {"UserName": user_name, "EncryChatRoomId": ""}
- def show_image(file_path):
- """
- 跨平台显示图片文件
- :param file_path: 图片文件路径
- """
- if sys.version_info >= (3, 3):
- from shlex import quote
- else:
- from pipes import quote
- if sys.platform == "darwin":
- command = "open -a /Applications/Preview.app %s&" % quote(file_path)
- os.system(command)
- else:
- webbrowser.open(os.path.join(tempfile.gettempdir(), 'wxbot', file_path))
- class SafeSession(requests.Session):
- def request(self, method, url, params=None, data=None, headers=None, cookies=None, files=None, auth=None,
- timeout=None, allow_redirects=True, proxies=None, hooks=None, stream=None, verify=None, cert=None,
- json=None):
- for i in range(3):
- try:
- return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth,
- timeout,
- allow_redirects, proxies, hooks, stream, verify, cert, json)
- except Exception as e:
- log.exception('request %s failed' % url)
- continue
- # 重试3次以后再加一次,抛出异常
- try:
- return super(SafeSession, self).request(method, url, params, data, headers, cookies, files, auth,
- timeout,
- allow_redirects, proxies, hooks, stream, verify, cert, json)
- except Exception as e:
- raise e
- class WXBot:
- """WXBot功能类"""
- def __init__(self):
- self.DEBUG = False
- self.uuid = ''
- self.base_uri = ''
- self.base_host = ''
- self.redirect_uri = ''
- self.uin = ''
- self.sid = ''
- self.skey = ''
- self.pass_ticket = ''
- self.device_id = 'e' + repr(random.random())[2:17]
- self.base_request = {}
- self.sync_key_str = ''
- self.sync_key = []
- self.sync_host = ''
- self.batch_count = 50 # 一次拉取50个联系人的信息
- self.full_user_name_list = [] # 直接获取不到通讯录时,获取的username列表
- self.wxid_list = [] # 获取到的wxid的列表
- self.cursor = 0 # 拉取联系人信息的游标
- self.is_big_contact = False # 通讯录人数过多,无法直接获取
- # 文件缓存目录
- self.temp_pwd = os.path.join(os.getcwd(), 'temp')
- if not os.path.exists(self.temp_pwd):
- log.info("Temp path not exists, create it: " + self.temp_pwd)
- os.makedirs(self.temp_pwd)
- else:
- log.info("Temp path exists: " + self.temp_pwd)
- self.session = SafeSession()
- self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux i686; U;) Gecko/20070322 Kazehakase/0.4.5'})
- self.conf = {'qr': 'png'}
- self.my_account = {} # 当前账户
- # 所有相关账号: 联系人, 公众号, 群组, 特殊账号
- self.member_list = []
- # 所有群组的成员, {'group_id1': [member1, member2, ...], ...}
- self.group_members = {}
- # 所有账户, {'group_member':{'id':{'type':'group_member', 'info':{}}, ...}, 'normal_member':{'id':{}, ...}}
- self.account_info = {'group_member': {}, 'normal_member': {}}
- self.contact_list = [] # 联系人列表
- self.public_list = [] # 公众账号列表
- self.group_list = [] # 群聊列表
- self.special_list = [] # 特殊账号列表
- self.encry_chat_room_id_list = [] # 存储群聊的EncryChatRoomId,获取群内成员头像时需要用到
- self.file_index = 0
- self.state_file = os.path.join(self.temp_pwd, 'session_state.json')
- @staticmethod
- def to_unicode(string, encoding='utf-8'):
- """
- 将字符串转换为Unicode
- :param string: 待转换字符串
- :param encoding: 字符串解码方式
- :return: 转换后的Unicode字符串
- """
- if isinstance(string, unicode):
- return string
- elif isinstance(string, str):
- return string.decode(encoding)
- else:
- raise Exception('Unknown Type')
- def get_contact(self):
- """获取当前账户的所有相关账号(包括联系人、公众号、群聊、特殊账号)"""
- if self.is_big_contact:
- return False
- url = self.base_uri + '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' \
- % (self.pass_ticket, self.skey, int(time.time()))
- # 如果通讯录联系人过多,这里会直接获取失败
- try:
- r = self.session.post(url, data='{}')
- except Exception as e:
- self.is_big_contact = True
- return False
- r.encoding = 'utf-8'
- if self.DEBUG:
- with open(os.path.join(self.temp_pwd,'contacts.json'), 'wb') as f:
- f.write(r.text.encode('utf-8'))
- dic = json.loads(r.text)
- self.member_list = dic['MemberList']
- special_users = ['newsapp', 'fmessage', 'filehelper', 'weibo', 'qqmail',
- 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle',
- 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp',
- 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp',
- 'feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder',
- 'weixinreminder', 'wxid_novlwrv3lqwv11', 'gh_22b87fa7cb3c',
- 'officialaccounts', 'notification_messages', 'wxid_novlwrv3lqwv11',
- 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages']
- self.contact_list = []
- self.public_list = []
- self.special_list = []
- self.group_list = []
- for contact in self.member_list:
- if contact['VerifyFlag'] & 8 != 0: # 公众号
- self.public_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact}
- elif contact['UserName'] in special_users: # 特殊账户
- self.special_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact}
- elif contact['UserName'].find('@@') != -1: # 群聊
- self.group_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact}
- elif contact['UserName'] == self.my_account['UserName']: # 自己
- self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact}
- else:
- self.contact_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact}
- self.batch_get_group_members()
- for group in self.group_members:
- for member in self.group_members[group]:
- if member['UserName'] not in self.account_info:
- self.account_info['group_member'][member['UserName']] = \
- {'type': 'group_member', 'info': member, 'group': group}
- if self.DEBUG:
- with open(os.path.join(self.temp_pwd,'contact_list.json'), 'w') as f:
- f.write(json.dumps(self.contact_list))
- with open(os.path.join(self.temp_pwd,'special_list.json'), 'w') as f:
- f.write(json.dumps(self.special_list))
- with open(os.path.join(self.temp_pwd,'group_list.json'), 'w') as f:
- f.write(json.dumps(self.group_list))
- with open(os.path.join(self.temp_pwd,'public_list.json'), 'w') as f:
- f.write(json.dumps(self.public_list))
- with open(os.path.join(self.temp_pwd,'member_list.json'), 'w') as f:
- f.write(json.dumps(self.member_list))
- with open(os.path.join(self.temp_pwd,'group_users.json'), 'w') as f:
- f.write(json.dumps(self.group_members))
- with open(os.path.join(self.temp_pwd,'account_info.json'), 'w') as f:
- f.write(json.dumps(self.account_info))
- return True
- def get_big_contact(self):
- total_len = len(self.full_user_name_list)
- user_info_list = []
- # 一次拉取50个联系人的信息,包括所有的群聊,公众号,好友
- while self.cursor < total_len:
- cur_batch = self.full_user_name_list[self.cursor:(self.cursor+self.batch_count)]
- self.cursor += self.batch_count
- cur_batch = map(map_username_batch, cur_batch)
- user_info_list += self.batch_get_contact(cur_batch)
- log.info("Get batch contacts")
- self.member_list = user_info_list
- special_users = ['newsapp', 'filehelper', 'weibo', 'qqmail',
- 'fmessage', 'tmessage', 'qmessage', 'qqsync', 'floatbottle',
- 'lbsapp', 'shakeapp', 'medianote', 'qqfriend', 'readerapp',
- 'blogapp', 'facebookapp', 'masssendapp', 'meishiapp',
- 'feedsapp', 'voip', 'blogappweixin', 'weixin', 'brandsessionholder',
- 'weixinreminder', 'wxid_novlwrv3lqwv11',
- 'officialaccounts',
- 'gh_22b87fa7cb3c', 'wxitil', 'userexperience_alarm', 'notification_messages', 'notifymessage']
- self.contact_list = []
- self.public_list = []
- self.special_list = []
- self.group_list = []
- for i, contact in enumerate(self.member_list):
- if contact['VerifyFlag'] & 8 != 0: # 公众号
- self.public_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'public', 'info': contact}
- elif contact['UserName'] in special_users or self.wxid_list[i] in special_users: # 特殊账户
- self.special_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'special', 'info': contact}
- elif contact['UserName'].find('@@') != -1: # 群聊
- self.group_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'group', 'info': contact}
- elif contact['UserName'] == self.my_account['UserName']: # 自己
- self.account_info['normal_member'][contact['UserName']] = {'type': 'self', 'info': contact}
- else:
- self.contact_list.append(contact)
- self.account_info['normal_member'][contact['UserName']] = {'type': 'contact', 'info': contact}
- group_members = {}
- encry_chat_room_id = {}
- for group in self.group_list:
- gid = group['UserName']
- members = group['MemberList']
- group_members[gid] = members
- encry_chat_room_id[gid] = group['EncryChatRoomId']
- self.group_members = group_members
- self.encry_chat_room_id_list = encry_chat_room_id
- for group in self.group_members:
- for member in self.group_members[group]:
- if member['UserName'] not in self.account_info:
- self.account_info['group_member'][member['UserName']] = \
- {'type': 'group_member', 'info': member, 'group': group}
- if self.DEBUG:
- with open(os.path.join(self.temp_pwd,'contact_list.json'), 'w') as f:
- f.write(json.dumps(self.contact_list))
- with open(os.path.join(self.temp_pwd,'special_list.json'), 'w') as f:
- f.write(json.dumps(self.special_list))
- with open(os.path.join(self.temp_pwd,'group_list.json'), 'w') as f:
- f.write(json.dumps(self.group_list))
- with open(os.path.join(self.temp_pwd,'public_list.json'), 'w') as f:
- f.write(json.dumps(self.public_list))
- with open(os.path.join(self.temp_pwd,'member_list.json'), 'w') as f:
- f.write(json.dumps(self.member_list))
- with open(os.path.join(self.temp_pwd,'group_users.json'), 'w') as f:
- f.write(json.dumps(self.group_members))
- with open(os.path.join(self.temp_pwd,'account_info.json'), 'w') as f:
- f.write(json.dumps(self.account_info))
- log.info('Get %d contacts' % len(self.contact_list))
- log.info('Start to process messages .')
- return True
- def batch_get_contact(self, cur_batch):
- """批量获取成员信息"""
- url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request,
- "Count": len(cur_batch),
- "List": cur_batch
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- return dic['ContactList']
- def batch_get_group_members(self):
- """批量获取所有群聊成员信息"""
- url = self.base_uri + '/webwxbatchgetcontact?type=ex&r=%s&pass_ticket=%s' % (int(time.time()), self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request,
- "Count": len(self.group_list),
- "List": [{"UserName": group['UserName'], "EncryChatRoomId": ""} for group in self.group_list]
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- group_members = {}
- encry_chat_room_id = {}
- for group in dic['ContactList']:
- gid = group['UserName']
- members = group['MemberList']
- group_members[gid] = members
- encry_chat_room_id[gid] = group['EncryChatRoomId']
- self.group_members = group_members
- self.encry_chat_room_id_list = encry_chat_room_id
- def get_group_member_name(self, gid, uid):
- """
- 获取群聊中指定成员的名称信息
- :param gid: 群id
- :param uid: 群聊成员id
- :return: 名称信息,类似 {"display_name": "test_user", "nickname": "test", "remark_name": "for_test" }
- """
- if gid not in self.group_members:
- return None
- group = self.group_members[gid]
- for member in group:
- if member['UserName'] == uid:
- names = {}
- if 'RemarkName' in member and member['RemarkName']:
- names['remark_name'] = member['RemarkName']
- if 'NickName' in member and member['NickName']:
- names['nickname'] = member['NickName']
- if 'DisplayName' in member and member['DisplayName']:
- names['display_name'] = member['DisplayName']
- return names
- return None
- def get_contact_info(self, uid):
- return self.account_info['normal_member'].get(uid)
- def get_group_member_info(self, uid):
- return self.account_info['group_member'].get(uid)
- def get_contact_name(self, uid):
- info = self.get_contact_info(uid)
- if info is None:
- return None
- info = info['info']
- name = {}
- if 'RemarkName' in info and info['RemarkName']:
- name['remark_name'] = info['RemarkName']
- if 'NickName' in info and info['NickName']:
- name['nickname'] = info['NickName']
- if 'DisplayName' in info and info['DisplayName']:
- name['display_name'] = info['DisplayName']
- if len(name) == 0:
- return None
- else:
- return name
- @staticmethod
- def get_contact_prefer_name(name):
- if name is None:
- return None
- if 'remark_name' in name:
- return name['remark_name']
- if 'nickname' in name:
- return name['nickname']
- if 'display_name' in name:
- return name['display_name']
- return None
- @staticmethod
- def get_group_member_prefer_name(name):
- if name is None:
- return None
- if 'remark_name' in name:
- return name['remark_name']
- if 'display_name' in name:
- return name['display_name']
- if 'nickname' in name:
- return name['nickname']
- return None
- def get_user_type(self, wx_user_id):
- """
- 获取特定账号与自己的关系
- :param wx_user_id: 账号id:
- :return: 与当前账号的关系
- """
- for account in self.contact_list:
- if wx_user_id == account['UserName']:
- return 'contact'
- for account in self.public_list:
- if wx_user_id == account['UserName']:
- return 'public'
- for account in self.special_list:
- if wx_user_id == account['UserName']:
- return 'special'
- for account in self.group_list:
- if wx_user_id == account['UserName']:
- return 'group'
- for group in self.group_members:
- for member in self.group_members[group]:
- if member['UserName'] == wx_user_id:
- return 'group_member'
- return 'unknown'
- def is_contact(self, uid):
- for account in self.contact_list:
- if uid == account['UserName']:
- return True
- return False
- def is_public(self, uid):
- for account in self.public_list:
- if uid == account['UserName']:
- return True
- return False
- def is_special(self, uid):
- for account in self.special_list:
- if uid == account['UserName']:
- return True
- return False
- def handle_msg_all(self, msg):
- """
- 处理所有消息,请子类化后覆盖此函数
- msg:
- msg_id -> 消息id
- msg_type_id -> 消息类型id
- user -> 发送消息的账号id
- content -> 消息内容
- :param msg: 收到的消息
- """
- pass
- @staticmethod
- def proc_at_info(msg):
- if not msg:
- return '', []
- segs = msg.split(u'\u2005')
- str_msg_all = ''
- str_msg = ''
- infos = []
- if len(segs) > 1:
- for i in range(0, len(segs) - 1):
- segs[i] += u'\u2005'
- pm = re.search(u'@.*\u2005', segs[i]).group()
- if pm:
- name = pm[1:-1]
- string = segs[i].replace(pm, '')
- str_msg_all += string + '@' + name + ' '
- str_msg += string
- if string:
- infos.append({'type': 'str', 'value': string})
- infos.append({'type': 'at', 'value': name})
- else:
- infos.append({'type': 'str', 'value': segs[i]})
- str_msg_all += segs[i]
- str_msg += segs[i]
- str_msg_all += segs[-1]
- str_msg += segs[-1]
- infos.append({'type': 'str', 'value': segs[-1]})
- else:
- infos.append({'type': 'str', 'value': segs[-1]})
- str_msg_all = msg
- str_msg = msg
- return str_msg_all.replace(u'\u2005', ''), str_msg.replace(u'\u2005', ''), infos
- def extract_msg_content(self, msg_type_id, msg):
- """
- content_type_id:
- 0 -> Text
- 1 -> Location
- 3 -> Image
- 4 -> Voice
- 5 -> Recommend
- 6 -> Animation
- 7 -> Share
- 8 -> Video
- 9 -> VideoCall
- 10 -> Redraw
- 11 -> Empty
- 99 -> Unknown
- :param msg_type_id: 消息类型id
- :param msg: 消息结构体
- :return: 解析的消息
- """
- mtype = msg['MsgType']
- content = HTMLParser.HTMLParser().unescape(msg['Content'])
- msg_id = msg['MsgId']
- msg_content = {}
- if msg_type_id == 0:
- return {'type': 11, 'data': ''}
- elif msg_type_id == 2: # File Helper
- return {'type': 0, 'data': content.replace('<br/>', '\n')}
- elif msg_type_id == 3: # 群聊
- sp = content.find('<br/>')
- uid = content[:sp]
- content = content[sp:]
- content = content.replace('<br/>', '')
- uid = uid[:-1]
- name = self.get_contact_prefer_name(self.get_contact_name(uid))
- if not name:
- name = self.get_group_member_prefer_name(self.get_group_member_name(msg['FromUserName'], uid))
- if not name:
- name = 'unknown'
- msg_content['user'] = {'id': uid, 'name': name}
- else: # Self, Contact, Special, Public, Unknown
- pass
- msg_prefix = (msg_content['user']['name'] + ':') if 'user' in msg_content else ''
- if mtype == 1:
- if content.find('http://weixin.qq.com/cgi-bin/redirectforward?args=') != -1:
- r = self.session.get(content)
- r.encoding = 'gbk'
- data = r.text
- pos = self.search_content('title', data, 'xml')
- msg_content['type'] = 1
- msg_content['data'] = pos
- msg_content['detail'] = data
- if self.DEBUG:
- log.info(' %s[Location] %s ' % (msg_prefix, pos))
- else:
- msg_content['type'] = 0
- if msg_type_id == 3 or (msg_type_id == 1 and msg['ToUserName'][:2] == '@@'): # Group text message
- msg_infos = self.proc_at_info(content)
- str_msg_all = msg_infos[0]
- str_msg = msg_infos[1]
- detail = msg_infos[2]
- msg_content['data'] = str_msg_all
- msg_content['detail'] = detail
- msg_content['desc'] = str_msg
- else:
- msg_content['data'] = content
- if self.DEBUG:
- try:
- log.info(' %s[Text] %s' % (msg_prefix, msg_content['data']))
- except UnicodeEncodeError:
- log.info(' %s[Text] (illegal text).' % msg_prefix)
- elif mtype == 3:
- msg_content['type'] = 3
- msg_content['data'] = self.get_msg_img_url(msg_id)
- msg_content['img'] = binascii.hexlify(self.session.get(msg_content['data']).content)
- if self.DEBUG:
- image = self.get_msg_img(msg_id)
- log.info(' %s[Image] %s' % (msg_prefix, image))
- elif mtype == 34:
- msg_content['type'] = 4
- msg_content['data'] = self.get_voice_url(msg_id)
- msg_content['voice'] = binascii.hexlify(self.session.get(msg_content['data']).content)
- if self.DEBUG:
- voice = self.get_voice(msg_id)
- log.info(' %s[Voice] %s' % (msg_prefix, voice))
- elif mtype == 37:
- msg_content['type'] = 37
- msg_content['data'] = msg['RecommendInfo']
- if self.DEBUG:
- log.info(' %s[useradd] %s' % (msg_prefix,msg['RecommendInfo']['NickName']))
- elif mtype == 42:
- msg_content['type'] = 5
- info = msg['RecommendInfo']
- msg_content['data'] = {'nickname': info['NickName'],
- 'alias': info['Alias'],
- 'province': info['Province'],
- 'city': info['City'],
- 'gender': ['unknown', 'male', 'female'][info['Sex']]}
- if self.DEBUG:
- log.info(' %s[Recommend]' % msg_prefix)
- log.info(' -----------------------------')
- log.info(' | NickName: %s' % info['NickName'])
- log.info(' | Alias: %s' % info['Alias'])
- log.info(' | Local: %s %s' % (info['Province'], info['City']))
- log.info(' | Gender: %s' % ['unknown', 'male', 'female'][info['Sex']])
- log.info(' -----------------------------')
- elif mtype == 47:
- msg_content['type'] = 6
- msg_content['data'] = self.search_content('cdnurl', content)
- if self.DEBUG:
- log.info(' %s[Animation] %s' % (msg_prefix, msg_content['data']))
- elif mtype == 49:
- msg_content['type'] = 7
- if msg['AppMsgType'] == 3:
- app_msg_type = 'music'
- elif msg['AppMsgType'] == 5:
- app_msg_type = 'link'
- elif msg['AppMsgType'] == 7:
- app_msg_type = 'weibo'
- else:
- app_msg_type = 'unknown'
- msg_content['data'] = {'type': app_msg_type,
- 'title': msg['FileName'],
- 'desc': self.search_content('des', content, 'xml'),
- 'url': msg['Url'],
- 'from': self.search_content('appname', content, 'xml'),
- 'content': msg.get('Content') # 有的公众号会发一次性3 4条链接一个大图,如果只url那只能获取第一条,content里面有所有的链接
- }
- if self.DEBUG:
- log.info(' %s[Share] %s' % (msg_prefix, app_msg_type))
- log.info(' --------------------------')
- log.info(' | title: %s' % msg['FileName'])
- log.info(' | desc: %s' % self.search_content('des', content, 'xml'))
- log.info(' | link: %s' % msg['Url'])
- log.info(' | from: %s' % self.search_content('appname', content, 'xml'))
- log.info(' | content: %s' % (msg.get('content')[:20] if msg.get('content') else "unknown"))
- log.info(' --------------------------')
- elif mtype == 62:
- msg_content['type'] = 8
- msg_content['data'] = content
- if self.DEBUG:
- log.info(' %s[Video] Please check on mobiles' % msg_prefix)
- elif mtype == 53:
- msg_content['type'] = 9
- msg_content['data'] = content
- if self.DEBUG:
- log.info(' %s[Video Call]' % msg_prefix)
- elif mtype == 10002:
- msg_content['type'] = 10
- msg_content['data'] = content
- if self.DEBUG:
- log.info(' %s[Redraw]' % msg_prefix)
- elif mtype == 10000: # unknown, maybe red packet, or group invite
- msg_content['type'] = 12
- msg_content['data'] = msg['Content']
- if self.DEBUG:
- log.info(' [Unknown]')
- elif mtype == 43:
- msg_content['type'] = 13
- msg_content['data'] = self.get_video_url(msg_id)
- if self.DEBUG:
- log.info(' %s[video] %s' % (msg_prefix, msg_content['data']))
- else:
- msg_content['type'] = 99
- msg_content['data'] = content
- if self.DEBUG:
- log.info(' %s[Unknown]' % msg_prefix)
- return msg_content
- def handle_msg(self, r):
- """
- 处理原始微信消息的内部函数
- msg_type_id:
- 0 -> Init
- 1 -> Self
- 2 -> FileHelper
- 3 -> Group
- 4 -> Contact
- 5 -> Public
- 6 -> Special
- 99 -> Unknown
- :param r: 原始微信消息
- """
- for msg in r['AddMsgList']:
- user = {'id': msg['FromUserName'], 'name': 'unknown'}
- if msg['MsgType'] == 51 and msg['StatusNotifyCode'] == 4: # init message
- msg_type_id = 0
- user['name'] = 'system'
- # 会获取所有联系人的username 和 wxid,但是会收到3次这个消息,只取第一次
- if self.is_big_contact and len(self.full_user_name_list) == 0:
- self.full_user_name_list = msg['StatusNotifyUserName'].split(",")
- self.wxid_list = re.search(r"username>(.*?)</username", msg["Content"]).group(1).split(",")
- with open(os.path.join(self.temp_pwd,'UserName.txt'), 'w') as f:
- f.write(msg['StatusNotifyUserName'])
- with open(os.path.join(self.temp_pwd,'wxid.txt'), 'w') as f:
- f.write(json.dumps(self.wxid_list))
- log.info('Contact list is too big. Now start to fetch member list .')
- self.get_big_contact()
- elif msg['MsgType'] == 37: # friend request
- msg_type_id = 37
- pass
- # content = msg['Content']
- # username = content[content.index('fromusername='): content.index('encryptusername')]
- # username = username[username.index('"') + 1: username.rindex('"')]
- # print u'[Friend Request]'
- # print u' Nickname:' + msg['RecommendInfo']['NickName']
- # print u' 附加消息:'+msg['RecommendInfo']['Content']
- # # print u'Ticket:'+msg['RecommendInfo']['Ticket'] # Ticket添加好友时要用
- # print u' 微信号:'+username #未设置微信号的 腾讯会自动生成一段微信ID 但是无法通过搜索 搜索到此人
- elif msg['FromUserName'] == self.my_account['UserName']: # Self
- msg_type_id = 1
- user['name'] = 'self'
- elif msg['ToUserName'] == 'filehelper': # File Helper
- msg_type_id = 2
- user['name'] = 'file_helper'
- elif msg['FromUserName'][:2] == '@@': # Group
- msg_type_id = 3
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_contact(msg['FromUserName']): # Contact
- msg_type_id = 4
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_public(msg['FromUserName']): # Public
- msg_type_id = 5
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- elif self.is_special(msg['FromUserName']): # Special
- msg_type_id = 6
- user['name'] = self.get_contact_prefer_name(self.get_contact_name(user['id']))
- else:
- msg_type_id = 99
- user['name'] = 'unknown'
- if not user['name']:
- user['name'] = 'unknown'
- user['name'] = HTMLParser.HTMLParser().unescape(user['name'])
- if self.DEBUG and msg_type_id != 0:
- log.info(u'[MSG] %s:' % user['name'])
- content = self.extract_msg_content(msg_type_id, msg)
- message = {'msg_type_id': msg_type_id,
- 'msg_id': msg['MsgId'],
- 'content': content,
- 'to_user_id': msg['ToUserName'],
- 'user': user}
- self.handle_msg_all(message)
- def schedule(self):
- """
- 做任务型事情的函数,如果需要,可以在子类中覆盖此函数
- 此函数在处理消息的间隙被调用,请不要长时间阻塞此函数
- """
- pass
- def proc_msg(self):
- self.test_sync_check()
- while True:
- check_time = time.time()
- try:
- [retcode, selector] = self.sync_check()
- # log.debug('sync_check:', retcode, selector)
- if retcode == '1100': # 从微信客户端上登出
- break
- elif retcode == '1101': # 从其它设备上登了网页微信
- break
- elif retcode == '0':
- if selector == '2': # 有新消息
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '3': # 未知
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '4': # 通讯录更新
- r = self.sync()
- if r is not None:
- self.get_contact()
- elif selector == '6': # 可能是红包
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '7': # 在手机上操作了微信
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- elif selector == '0': # 无事件
- pass
- else:
- log.debug('sync_check:', retcode, selector)
- r = self.sync()
- if r is not None:
- self.handle_msg(r)
- else:
- log.debug('sync_check:', retcode, selector)
- time.sleep(10)
- self.schedule()
- except:
- log.exception('Except in proc_msg')
- check_time = time.time() - check_time
- if check_time < 0.8:
- time.sleep(1 - check_time)
- def apply_useradd_requests(self,RecommendInfo):
- url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN'
- params = {
- "BaseRequest": self.base_request,
- "Opcode": 3,
- "VerifyUserListSize": 1,
- "VerifyUserList": [
- {
- "Value": RecommendInfo['UserName'],
- "VerifyUserTicket": RecommendInfo['Ticket'] }
- ],
- "VerifyContent": "",
- "SceneListCount": 1,
- "SceneList": [
- 33
- ],
- "skey": self.skey
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def add_groupuser_to_friend_by_uid(self,uid,VerifyContent):
- """
- 主动向群内人员打招呼,提交添加好友请求
- uid-群内人员得uid VerifyContent-好友招呼内容
- 慎用此接口!封号后果自负!慎用此接口!封号后果自负!慎用此接口!封号后果自负!
- """
- if self.is_contact(uid):
- return True
- url = self.base_uri + '/webwxverifyuser?r='+str(int(time.time()))+'&lang=zh_CN'
- params ={
- "BaseRequest": self.base_request,
- "Opcode": 2,
- "VerifyUserListSize": 1,
- "VerifyUserList": [
- {
- "Value": uid,
- "VerifyUserTicket": ""
- }
- ],
- "VerifyContent": VerifyContent,
- "SceneListCount": 1,
- "SceneList": [
- 33
- ],
- "skey": self.skey
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def add_friend_to_group(self,uid,group_name):
- """
- 将好友加入到群聊中
- """
- gid = ''
- # 通过群名获取群id,群没保存到通讯录中的话无法添加哦
- for group in self.group_list:
- if group['NickName'] == group_name:
- gid = group['UserName']
- if gid == '':
- return False
- # 通过群id判断uid是否在群中
- for user in self.group_members[gid]:
- if user['UserName'] == uid:
- # 已经在群里面了,不用加了
- return True
- url = self.base_uri + '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % self.pass_ticket
- params ={
- "AddMemberList": uid,
- "ChatRoomName": gid,
- "BaseRequest": self.base_request
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def invite_friend_to_group(self,uid,group_name):
- """
- 将好友加入到群中。对人数多的群,需要调用此方法。
- 拉人时,可以先尝试使用add_friend_to_group方法,当调用失败(Ret=1)时,再尝试调用此方法。
- """
- gid = ''
- # 通过群名获取群id,群没保存到通讯录中的话无法添加哦
- for group in self.group_list:
- if group['NickName'] == group_name:
- gid = group['UserName']
- if gid == '':
- return False
- # 通过群id判断uid是否在群中
- for user in self.group_members[gid]:
- if user['UserName'] == uid:
- # 已经在群里面了,不用加了
- return True
- url = self.base_uri + '/webwxupdatechatroom?fun=invitemember&pass_ticket=%s' % self.pass_ticket
- params = {
- "InviteMemberList": uid,
- "ChatRoomName": gid,
- "BaseRequest": self.base_request
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def delete_user_from_group(self,uname,gid):
- """
- 将群用户从群中剔除,只有群管理员有权限
- """
- uid = ""
- for user in self.group_members[gid]:
- if user['NickName'] == uname:
- uid = user['UserName']
- if uid == "":
- return False
- url = self.base_uri + '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % self.pass_ticket
- params ={
- "DelMemberList": uid,
- "ChatRoomName": gid,
- "BaseRequest": self.base_request
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def set_group_name(self,gid,gname):
- """
- 设置群聊名称
- """
- url = self.base_uri + '/webwxupdatechatroom?fun=modtopic&pass_ticket=%s' % self.pass_ticket
- params ={
- "NewTopic": gname,
- "ChatRoomName": gid,
- "BaseRequest": self.base_request
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def send_msg_by_uid(self, word, dst='filehelper'):
- url = self.base_uri + '/webwxsendmsg?pass_ticket=%s' % self.pass_ticket
- msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')
- word = self.to_unicode(word)
- params = {
- 'BaseRequest': self.base_request,
- 'Msg': {
- "Type": 1,
- "Content": word,
- "FromUserName": self.my_account['UserName'],
- "ToUserName": dst,
- "LocalID": msg_id,
- "ClientMsgId": msg_id
- }
- }
- headers = {'content-type': 'application/json; charset=UTF-8'}
- data = json.dumps(params, ensure_ascii=False).encode('utf8')
- try:
- r = self.session.post(url, data=data, headers=headers)
- except (ConnectionError, ReadTimeout):
- return False
- dic = r.json()
- return dic['BaseResponse']['Ret'] == 0
- def upload_media(self, fpath, is_img=False):
- if not os.path.exists(fpath):
- log.error('File not exists.')
- return None
- url_1 = 'https://file.'+self.base_host+'/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'
- url_2 = 'https://file2.'+self.base_host+'/cgi-bin/mmwebwx-bin/webwxuploadmedia?f=json'
- flen = str(os.path.getsize(fpath))
- ftype = mimetypes.guess_type(fpath)[0] or 'application/octet-stream'
- files = {
- 'id': (None, 'WU_FILE_%s' % str(self.file_index)),
- 'name': (None, os.path.basename(fpath)),
- 'type': (None, ftype),
- 'lastModifiedDate': (None, time.strftime('%m/%d/%Y, %H:%M:%S GMT+0800 (CST)')),
- 'size': (None, flen),
- 'mediatype': (None, 'pic' if is_img else 'doc'),
- 'uploadmediarequest': (None, json.dumps({
- 'BaseRequest': self.base_request,
- 'ClientMediaId': int(time.time()),
- 'TotalLen': flen,
- 'StartPos': 0,
- 'DataLen': flen,
- 'MediaType': 4,
- })),
- 'webwx_data_ticket': (None, self.session.cookies['webwx_data_ticket']),
- 'pass_ticket': (None, self.pass_ticket),
- 'filename': (os.path.basename(fpath), open(fpath, 'rb'),ftype.split('/')[1]),
- }
- self.file_index += 1
- try:
- r = self.session.post(url_1, files=files)
- if json.loads(r.text)['BaseResponse']['Ret'] != 0:
- # 当file返回值不为0时则为上传失败,尝试第二服务器上传
- r = self.session.post(url_2, files=files)
- if json.loads(r.text)['BaseResponse']['Ret'] != 0:
- log.error('Upload media failure.')
- return None
- mid = json.loads(r.text)['MediaId']
- return mid
- except:
- return None
- def send_file_msg_by_uid(self, fpath, uid):
- mid = self.upload_media(fpath)
- if mid is None or not mid:
- return False
- url = self.base_uri + '/webwxsendappmsg?fun=async&f=json&pass_ticket=' + self.pass_ticket
- msg_id = str(int(time.time() * 1000)) + str(random.random())[:5].replace('.', '')
- data = {
- 'BaseRequest': self.base_request,
- 'Msg': {
- 'Type': 6,
- 'Content': ("<appmsg appid='wxeb7ec651dd0aefa9' sdkver=''><title>%s</title><des></des><action></action><type>6</type><content></content><url></url><lowurl></lowurl><appattach><totallen>%s</totallen><attachid>%s</attachid><fileext>%s</fileext></appattach><extinfo></extinfo></appmsg>" % (os.path.basename(fpath).encode('utf-8'), str(os.path.getsize(fpath)), mid, fpath.split('.')[-1])).encode('utf8'),
- 'FromUserName': self.my_account['UserName'],
- 'ToUserName': uid,
- 'LocalID': msg_id,
- 'ClientMsgId': msg_id, }, }
- try:
- r = self.session.post(url, data=json.dumps(data))
- res = json.loads(r.text)
- if res['BaseResponse']['Ret'] == 0:
- return True
- else:
- return False
- except:
- return False
- def send_img_msg_by_uid(self, fpath, uid):
- mid = self.upload_media(fpath, is_img=True)
- if mid is None:
- log.error("upload media failed")
- return False
- url = self.base_uri + '/webwxsendmsgimg?fun=async&f=json'
- data = {
- 'BaseRequest': self.base_request,
- 'Msg': {
- 'Type': 3,
- 'MediaId': mid,
- 'FromUserName': self.my_account['UserName'],
- 'ToUserName': uid,
- 'LocalID': str(time.time() * 1e7),
- 'ClientMsgId': str(time.time() * 1e7), }, }
- if fpath[-4:] == '.gif':
- url = self.base_uri + '/webwxsendemoticon?fun=sys'
- data['Msg']['Type'] = 47
- data['Msg']['EmojiFlag'] = 2
- try:
- r = self.session.post(url, data=json.dumps(data))
- res = json.loads(r.text)
- if res['BaseResponse']['Ret'] == 0:
- return True
- else:
- return False
- except:
- return False
- def get_user_id(self, name):
- if name == '':
- return None
- name = self.to_unicode(name)
- for contact in self.contact_list:
- if 'RemarkName' in contact and contact['RemarkName'] == name:
- return contact['UserName']
- elif 'NickName' in contact and contact['NickName'] == name:
- return contact['UserName']
- elif 'DisplayName' in contact and contact['DisplayName'] == name:
- return contact['UserName']
- for group in self.group_list:
- if 'RemarkName' in group and group['RemarkName'] == name:
- return group['UserName']
- if 'NickName' in group and group['NickName'] == name:
- return group['UserName']
- if 'DisplayName' in group and group['DisplayName'] == name:
- return group['UserName']
- return ''
- def send_msg(self, name, word, isfile=False):
- uid = self.get_user_id(name)
- if uid is not None:
- if isfile:
- with open(word, 'r') as f:
- result = True
- for line in f.readlines():
- line = line.replace('\n', '')
- log.info('-> ' + name + ': ' + line)
- if self.send_msg_by_uid(line, uid):
- pass
- else:
- result = False
- time.sleep(1)
- return result
- else:
- word = self.to_unicode(word)
- if self.send_msg_by_uid(word, uid):
- return True
- else:
- return False
- else:
- if self.DEBUG:
- log.error('This user does not exist .')
- return True
- @staticmethod
- def search_content(key, content, fmat='attr'):
- if fmat == 'attr':
- pm = re.search(key + '\s?=\s?"([^"<]+)"', content)
- if pm:
- return pm.group(1)
- elif fmat == 'xml':
- pm = re.search('<{0}>([^<]+)</{0}>'.format(key), content)
- if pm:
- return pm.group(1)
- return 'unknown'
- def login(self):
- self.get_uuid()
- self.gen_qr_code(os.path.join(self.temp_pwd,'wxqr.png'))
- log.info('Please use WeChat to scan the QR code .')
- result = self.wait4login()
- if result != SUCCESS:
- log.error('Web WeChat login failed. failed code=%s' % (result,))
- return False
- if self._login():
- log.info('Web WeChat login succeed .')
- else:
- log.error('Web WeChat login failed .')
- return False
- if self.init():
- log.info('Web WeChat init succeed .')
- return True
- else:
- log.error('Web WeChat init failed')
- return False
- def run(self):
- if not self.login():
- log.error('login failed.')
- return False
- self.status_notify()
- if self.get_contact():
- log.info('Get %d contacts' % len(self.contact_list))
- log.info('Start to process messages .')
- self.proc_msg()
- def get_uuid(self):
- url = 'https://login.weixin.qq.com/jslogin'
- params = {
- 'appid': 'wx782c26e4c19acffb',
- 'fun': 'new',
- 'lang': 'zh_CN',
- '_': int(time.time()) * 1000 + random.randint(1, 999),
- }
- r = self.session.get(url, params=params)
- r.encoding = 'utf-8'
- data = r.text
- regx = r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(\S+?)"'
- pm = re.search(regx, data)
- if pm:
- code = pm.group(1)
- self.uuid = pm.group(2)
- return code == '200'
- return False
- def gen_qr_code(self, qr_file_path):
- string = 'https://login.weixin.qq.com/l/' + self.uuid
- qr = pyqrcode.create(string)
- if self.conf['qr'] == 'png':
- qr.png(qr_file_path, scale=8)
- show_image(qr_file_path)
- # img = Image.open(qr_file_path)
- # img.show()
- elif self.conf['qr'] == 'tty':
- print(qr.terminal(quiet_zone=1))
- def do_request(self, url):
- r = self.session.get(url)
- r.encoding = 'utf-8'
- data = r.text
- param = re.search(r'window.code=(\d+);', data)
- code = param.group(1)
- return code, data
- def wait4login(self):
- """
- http comet:
- tip=1, 等待用户扫描二维码,
- 201: scaned
- 408: timeout
- tip=0, 等待用户确认登录,
- 200: confirmed
- """
- LOGIN_TEMPLATE = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s'
- tip = 1
- try_later_secs = 1
- MAX_RETRY_TIMES = 10
- code = UNKONWN
- retry_time = MAX_RETRY_TIMES
- while retry_time > 0:
- url = LOGIN_TEMPLATE % (tip, self.uuid, int(time.time()))
- code, data = self.do_request(url)
- if code == SCANED:
- log.info('Please confirm to login .')
- tip = 0
- elif code == SUCCESS: # 确认登录成功
- param = re.search(r'window.redirect_uri="(\S+?)";', data)
- redirect_uri = param.group(1) + '&fun=new'
- self.redirect_uri = redirect_uri
- self.base_uri = redirect_uri[:redirect_uri.rfind('/')]
- temp_host = self.base_uri[8:]
- self.base_host = temp_host[:temp_host.find("/")]
- return code
- elif code == TIMEOUT:
- log.error('WeChat login timeout. retry in %s secs later...' % (try_later_secs,))
- tip = 1 # 重置
- retry_time -= 1
- time.sleep(try_later_secs)
- else:
- log.error('WeChat login exception return_code=%s. retry in %s secs later...' %
- (code, try_later_secs))
- tip = 1
- retry_time -= 1
- time.sleep(try_later_secs)
- return code
- def _login(self):
- if len(self.redirect_uri) < 4:
- log.error('Login failed due to network problem, please try again.')
- return False
- r = self.session.get(self.redirect_uri)
- r.encoding = 'utf-8'
- data = r.text
- doc = xml.dom.minidom.parseString(data)
- root = doc.documentElement
- for node in root.childNodes:
- if node.nodeName == 'skey':
- self.skey = node.childNodes[0].data
- elif node.nodeName == 'wxsid':
- self.sid = node.childNodes[0].data
- elif node.nodeName == 'wxuin':
- self.uin = node.childNodes[0].data
- elif node.nodeName == 'pass_ticket':
- self.pass_ticket = node.childNodes[0].data
- if '' in (self.skey, self.sid, self.uin, self.pass_ticket):
- return False
- self.base_request = {
- 'Uin': self.uin,
- 'Sid': self.sid,
- 'Skey': self.skey,
- 'DeviceID': self.device_id,
- }
- return True
- def init(self):
- url = self.base_uri + '/webwxinit?r=%i&lang=en_US&pass_ticket=%s' % (int(time.time()), self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- self.sync_key = dic['SyncKey']
- self.my_account = dic['User']
- self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])
- for keyVal in self.sync_key['List']])
- return dic['BaseResponse']['Ret'] == 0
- def status_notify(self):
- url = self.base_uri + '/webwxstatusnotify?lang=zh_CN&pass_ticket=%s' % self.pass_ticket
- self.base_request['Uin'] = int(self.base_request['Uin'])
- params = {
- 'BaseRequest': self.base_request,
- "Code": 3,
- "FromUserName": self.my_account['UserName'],
- "ToUserName": self.my_account['UserName'],
- "ClientMsgId": int(time.time())
- }
- r = self.session.post(url, data=json.dumps(params))
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- return dic['BaseResponse']['Ret'] == 0
- def test_sync_check(self):
- for host1 in ['webpush.', 'webpush2.']:
- self.sync_host = host1+self.base_host
- try:
- retcode = self.sync_check()[0]
- except:
- retcode = -1
- if retcode == '0':
- return True
- return False
- def sync_check(self):
- params = {
- 'r': int(time.time()),
- 'sid': self.sid,
- 'uin': self.uin,
- 'skey': self.skey,
- 'deviceid': self.device_id,
- 'synckey': self.sync_key_str,
- '_': int(time.time()),
- }
- url = 'https://' + self.sync_host + '/cgi-bin/mmwebwx-bin/synccheck?' + urlencode(params)
- try:
- r = self.session.get(url, timeout=60)
- r.encoding = 'utf-8'
- data = r.text
- pm = re.search(r'window.synccheck=\{retcode:"(\d+)",selector:"(\d+)"\}', data)
- retcode = pm.group(1)
- selector = pm.group(2)
- return [retcode, selector]
- except:
- return [-1, -1]
- def sync(self):
- url = self.base_uri + '/webwxsync?sid=%s&skey=%s&lang=en_US&pass_ticket=%s' \
- % (self.sid, self.skey, self.pass_ticket)
- params = {
- 'BaseRequest': self.base_request,
- 'SyncKey': self.sync_key,
- 'rr': ~int(time.time())
- }
- try:
- r = self.session.post(url, data=json.dumps(params), timeout=60)
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- if dic['BaseResponse']['Ret'] == 0:
- self.sync_key = dic['SyncKey']
- self.sync_key_str = '|'.join([str(keyVal['Key']) + '_' + str(keyVal['Val'])
- for keyVal in self.sync_key['List']])
- return dic
- except:
- return None
- def get_icon(self, uid, gid=None):
- """
- 获取联系人或者群聊成员头像
- :param uid: 联系人id
- :param gid: 群id,如果为非None获取群中成员头像,如果为None则获取联系人头像
- """
- if gid is None:
- url = self.base_uri + '/webwxgeticon?username=%s&skey=%s' % (uid, self.skey)
- else:
- url = self.base_uri + '/webwxgeticon?username=%s&skey=%s&chatroomid=%s' % (
- uid, self.skey, self.encry_chat_room_id_list[gid])
- r = self.session.get(url)
- data = r.content
- fn = 'icon_' + uid + '.jpg'
- with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
- f.write(data)
- return fn
- def get_head_img(self, uid):
- """
- 获取群头像
- :param uid: 群uid
- """
- url = self.base_uri + '/webwxgetheadimg?username=%s&skey=%s' % (uid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'head_' + uid + '.jpg'
- with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
- f.write(data)
- return fn
- def get_msg_img_url(self, msgid):
- return self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)
- def get_msg_img(self, msgid):
- """
- 获取图片消息,下载图片到本地
- :param msgid: 消息id
- :return: 保存的本地图片文件路径
- """
- url = self.base_uri + '/webwxgetmsgimg?MsgID=%s&skey=%s' % (msgid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'img_' + msgid + '.jpg'
- with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
- f.write(data)
- return fn
- def get_voice_url(self, msgid):
- return self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)
- def get_voice(self, msgid):
- """
- 获取语音消息,下载语音到本地
- :param msgid: 语音消息id
- :return: 保存的本地语音文件路径
- """
- url = self.base_uri + '/webwxgetvoice?msgid=%s&skey=%s' % (msgid, self.skey)
- r = self.session.get(url)
- data = r.content
- fn = 'voice_' + msgid + '.mp3'
- with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
- f.write(data)
- return fn
- def get_video_url(self, msgid):
- return self.base_uri + '/webwxgetvideo?msgid=%s&skey=%s' % (msgid, self.skey)
- def get_video(self, msgid):
- """
- 获取视频消息,下载视频到本地
- :param msgid: 视频消息id
- :return: 保存的本地视频文件路径
- """
- url = self.base_uri + '/webwxgetvideo?msgid=%s&skey=%s' % (msgid, self.skey)
- headers = {'Range': 'bytes=0-'}
- r = self.session.get(url, headers=headers)
- data = r.content
- fn = 'video_' + msgid + '.mp4'
- with open(os.path.join(self.temp_pwd,fn), 'wb') as f:
- f.write(data)
- return fn
- def set_remarkname(self,uid,remarkname):#设置联系人的备注名
- url = self.base_uri + '/webwxoplog?lang=zh_CN&pass_ticket=%s' \
- % (self.pass_ticket)
- remarkname = self.to_unicode(remarkname)
- params = {
- 'BaseRequest': self.base_request,
- 'CmdId': 2,
- 'RemarkName': remarkname,
- 'UserName': uid
- }
- try:
- r = self.session.post(url, data=json.dumps(params), timeout=60)
- r.encoding = 'utf-8'
- dic = json.loads(r.text)
- return dic['BaseResponse']['ErrMsg']
- except:
- return None
|