从对抗到出洞:某金融APP 实战渗透与 Frida 反检测绕过(Rpc + Flask + AutoDecoder)

华盟原创文章投稿奖励计划

注意:本研究仅限在授权范围内进行。本文所有技术演示均在受控测试环境下完成。未经明确许可,对他人系统进行探测、篡改或信息窃取可能触犯当地法律和公司政策。若在渗透测试中发现严重漏洞,请及时通过合规渠道通知责任方并跟进修复。

frida检测

frida可以正常注入程序,但是一使用Java.use就会闪退

自动草稿

这里编译frida-bridge或者使用ZygiskFrida都可以绕过Java.use检测

编译frida-bridge可以参考这个视频

frida模块化开发,frida-compile,解决frida的java的api检测相关问题,frida-java-bridge调试_哔哩哔哩_bilibili

加载编译好的_agent.js即可绕过检测

自动草稿自动草稿

ZygiskFrida这里使用的是这个项目

https://github.com/sucsand/sucsand

自动草稿

算法分析

首先抓到的数据包是这样的,请求体和响应体都是json格式,并且以=作为键。
自动草稿

先上算法助手hook一遍,然后搜索相关的密文字符串,但是并没有找到相关联的hook结果,那大概率加解密都是放在native层,然后看到请求头中有验签的字段X-Emp-Signature,索性直接jadx大法,全局搜索X-Emp-Signature

自动草稿自动草稿

经过分析定位到initHttpRequest方法,这里签名字段是通过HmacSha1实现的

自动草稿

这里encryptHMAC通过配置决定使用国密 SM3 算法还是传入的算法配置对输入数据和密钥进行 HMAC 运算。

自动草稿

签名的算法搞定后,再去看请求体是如何加密的,一般验证签名都是在数据加密后再去做的,可以通过jadx向上找引用,这里省的看代码直接用frida打印堆栈去看

自动草稿

往上追踪定位到sendResquest方法,sendResquest中通过handleRequestBody函数来处理请求体

自动草稿

继续跟到handleRequestBody,这里直接贴出GPT给出的解释

自动草稿自动草稿自动草稿

handleRequestBody处理完请求体之后,会调用sendResquest发送请求,这里再看下sendResquest的逻辑

自动草稿

这里会再对上面handleRequestBody的结果再进行一次Base64编码,然后再拼接成json发送数据包

自动草稿

这样逻辑就比较清晰了

resultByts = HMAC(序列号 || AES(RNC + 请求体))  || 序列号 || AES(RNC + 请求体) result = Base64(resultByts) 请求体 = Base64(result) 

所以只要能拿到AESCipher.clientKey_和AESCipher.ClientIv_以及ClientHello.ServerHmacKey即可对加密后的数据进行还原并且重放,这里继续往下跟踪AESCipherClientHello代码,发现只是定义了静态变量没有进行赋值。

自动草稿自动草稿

为了找到clientKey_clientIv_,继续追踪AESAdapter.encrypt方法,发现每次重新打开APP,对应的clientKey_clientIv_都会改变,一开始想的是是不是动态向服务器去请求的clientKey_clientIv_,但是抓包并没有发现类似的请求,事情开始变得有趣了起来。

自动草稿自动草稿

clientKey_clientIv_既不是通过HTTP请求传输的,又不是硬编码在代码中的,那服务端到底是如何解密的呢?

为了搞清楚clientKey_clientIv_的生成逻辑,继续往上找AESCipher这个类的相关引用,最后发现一个可疑的方法,

自动草稿

这里的关键其实是第一行代码,其余的代码都是在做密钥的分割,然后赋值。

byte[] allSecret = PRFCipher.PRF(ms2, HMac.TLS_MD_CLIENT_SERVER_KEYIVMAC_CONST(), ms2RncRnsSeed, R2.attr.arrowHeadLength); 

所以还是得继续往上找引用,看看ms2是如何生成的,定位到handleServerKeyExchange方法,熟悉TLS流程的朋友这里就能看出来,这里的TLS握手流程中的服务器密钥交换过程非常相似。

自动草稿

继续往上找,定位到handlerServerKeyExchange函数

自动草稿

继续追handleFacilityServerHelloResponse

自动草稿

继续往上追,发现了一个可疑的请求,生成的密钥结果都是从这个请求中提取的。

自动草稿

抓包也同样看到了这个请求

自动草稿

继续向上看定位到ClientHello的构造方法

自动草稿

这里直接贴出GPT给的代码解释

自动草稿

但是由于测试时间有限,按理说这部分代码也是可以通过Python代码去解析响应然后提取clientIvclientKeyserverKeyserverIv的,但转念一想这几个参数都是静态参数,可以直接用frida去获取。

解密思路

首先用frida+rpc+flask获取clientIvclientKeyserverKeyserverIv,脚本如下

在上文编译好的_agent.js中添加需要发送到Python端的数据。

Java.perform(function ({  var ClientHello = Java.use("com.rytong.emp.net.ClientHello"); var mClientHmacKey = ClientHello.mClientHmacKey.value; var AESCipher = Java.use("com.rytong.emp.security.AESCipher"); var clientKey = AESCipher.clientKey_.value; var clientIv = AESCipher.clientIv_.value; var serverKey = AESCipher.serverKey_.value; var serverIv = AESCipher.serverIv_.value;  var result = {         clientKey: bytesToHex(Java.array('byte', clientKey)),         clientIv: bytesToHex(Java.array('byte', clientIv)),         serverKey: bytesToHex(Java.array('byte', serverKey)),         serverIv: bytesToHex(Java.array('byte', serverIv)),         clientHmacKey: bytesToHex(Java.array('byte', mClientHmacKey))     };      send(JSON.stringify(result)); }); 

Python端接收数据,并且提供对外的接口。

import frida import sys import json from flask import Flask, jsonify import os  TARGET_APP = "xxx"# 你需要改成实际包名  aes_data = { "clientKey": None, "clientIv": None, "serverKey": None, "serverIv": None, "clientHmacKey": None, }  def load_agent_script(file_path): if not os.path.exists(file_path): print(f"[!] 找不到脚本文件: {file_path}")         sys.exit(1)     with open(file_path, 'r', encoding='utf-8'as f: return f.read()  def on_message(message, data): global aes_data if message["type"] == "send": print("[*] 获取到AES信息: ", message["payload"])         aes_data.update(json.loads(message["payload"]))     elif message["type"] == "error": print("[!] 脚本错误: ", message["stack"])  def init_frida(agent_file="_agent.js"):     device = frida.get_usb_device()  try:         session = device.attach(TARGET_APP) print(f"[*] 已连接到正在运行的应用: {TARGET_APP}")     except frida.ProcessNotFoundError: print(f"[!] 未找到运行中的进程: {TARGET_APP}") print("[!] 请确保应用已经启动") return None     except Exceptionas e: print(f"[!] 连接失败: {e}") return None      agent_code = load_agent_script(agent_file)     script = session.create_script(agent_code)     script.on("message", on_message)     script.load() print("[*] 已启动 Frida hook,等待 AES 数据...") return session  app = Flask(__name__)  @app.route("/get_aes_key", methods=["GET"]) def get_aes_key(): if aes_data["clientKey"]: return jsonify({"status""ok""data": aes_data}) else: return jsonify({"status""error""msg""AES key not yet captured"})  if __name__ == "__main__":     init_frida("_agent.js")       app.run(host="0.0.0.0", port=5001, debug=False) 

运行成功后提示如下

自动草稿

然后就可以通过自定义autodecoder接口加解密脚本,来实现自动化解密。

按步骤来解密思路如下

  1. Base64解码: 首先,将接收到的字符串进行两次 Base64 解码,还原成原始的二进制数据包。
  2. 数据切割: 按照 HMAC签名 (20字节) + 序列号 (9字节) + AES加密体 的固定结构,将二进制数据包精确地切割成三部分。
  3. AES解密: 使用从Frida服务动态获取的 clientKey 和 clientIv,对 AES加密体 部分进行 AES-CBC 模式的解密。
  4. 去除填充与头部: 对解密后的数据,先执行 PKCS7 Unpadding(去除填充),然后再从数据头部去掉一个 RNC随机数 (32字节)
  5. 提取明文: 经过以上步骤后,剩余的数据就是最终可读的JSON明文。同时,脚本会保存本次解密得到的序列号RNC随机数,供后续加密时使用。

加密思路同样如此

  1. 准备明文: 将要发送的JSON明文前,先拼接上一个(复用的或新的)RNC随机数
  2. 填充与加密: 对拼接后的数据进行 PKCS7 Padding,然后使用 clientKey 和 clientIv 进行 AES-CBC 加密,得到 AES加密体
  3. 计算HMAC签名: 将一个(复用的或新的)序列号与 AES加密体 拼接,然后使用 clientHmacKey 计算其 HMAC-SHA1 签名。
  4. **数据封装:**按照 HMAC签名 + 序列号 + AES加密体 的顺序组装成最终的数据包,再对此数据包进行两次 Base64 编码,得到可以发送的字符串。

这里附上autodecoder脚本,思路仅供参考学习

# -*- coding:utf-8 -*- from flask import Flask, request, jsonify from Crypto.Cipher import AES import requests import base64 import json import binascii import chardet import traceback import hmac import hashlib import sys import os import re  HMAC_LEN = 20 SERIAL_LEN = 9 RNC_LEN = 32 DEFAULT_ENCODING = "utf-8"  # 保存最近一次“请求包”解密得到的数据,供后续加密复用 LAST_DECODE_STATE = { "serial_bytes": None,     # 9字节序列号 "rnc_bytes": None,        # RNC 前缀 "encoding": DEFAULT_ENCODING, }  FRIDA_RPC_URL = "http://127.0.0.1:5001/get_aes_key" app = Flask(__name__)  # 仅替换 JSON 字符串字面量内部的等号为 \u003d,避免替换到非字符串位置 def escape_equals_in_json_strings(s: str) -> str: # 匹配 JSON 字符串(含转义)     pattern = r'"(?:\\.|[^"\\])*"'     def repl(m):         token = m.group(0)            # 包含首尾引号         inner = token[1:-1]         inner = inner.replace('=''\\u003d') return'"' + inner + '"' return re.sub(pattern, repl, s)  def pkcs7_unpad(data: bytes) -> bytes:     pad_len = data[-1] if pad_len < 1or pad_len > AES.block_size:         raise ValueError("Invalid padding length") if data[-pad_len:] != bytes([pad_len]) * pad_len:         raise ValueError("Invalid padding bytes") return data[:-pad_len]  def pkcs7_pad(data: bytes) -> bytes:     pad_len = AES.block_size - (len(data) % AES.block_size) return data + bytes([pad_len]) * pad_len  def get_aes_keys() -> dict: try:         resp = requests.get(FRIDA_RPC_URL, timeout=3)         resp.raise_for_status()         j = resp.json() if j.get("status") != "ok":             raise Exception(j.get("msg""unknown error")) return j["data"]     except Exceptionas e:         raise Exception(f"获取AES key失败: {e}")  def verify_hmac(data_bytes: bytes, hmac_key: bytes, expect_hmac: bytes) -> bool: # 与 Java 一致:直接对原始字节做 HMAC-SHA1     calc_hmac = hmac.new(hmac_key, data_bytes, hashlib.sha1).digest() return calc_hmac == expect_hmac  def calc_hmac_java_style(data_bytes: bytes, hmac_key: bytes) -> bytes: """     与 Java 一致:直接对原始字节做 HMAC-SHA1     """ return hmac.new(hmac_key, data_bytes, hashlib.sha1).digest()  def request_decode_tls13(cipher_str: str, client_key_hex: str, client_iv_hex: str, hmac_key_hex: str,                          hmac_len=HMAC_LEN, ser_len=SERIAL_LEN, rnc_len=RNC_LEN) -> str:      first_decode = base64.b64decode(cipher_str)     second_decode = base64.b64decode(first_decode)     data = bytearray(second_decode)  # 拆包     hmac_bytes = data[:hmac_len]     serial_bytes = data[hmac_len:hmac_len + ser_len]     body_cr_bytes = data[hmac_len + ser_len:]  # AES 解密     key_bytes = binascii.unhexlify(client_key_hex)     iv_bytes = binascii.unhexlify(client_iv_hex)     cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)     decrypted = cipher.decrypt(body_cr_bytes)     decrypted = pkcs7_unpad(decrypted)  if len(decrypted) < rnc_len:         raise ValueError("解密数据长度异常")     rnc_bytes = decrypted[:rnc_len]     plaintext_bytes = decrypted[rnc_len:]      encoding = chardet.detect(plaintext_bytes).get("encoding"or"utf-8" try:         LAST_DECODE_STATE["serial_bytes"] = bytes(serial_bytes)         LAST_DECODE_STATE["rnc_bytes"] = bytes(rnc_bytes)         LAST_DECODE_STATE["encoding"] = encoding     except Exception:         pass  return plaintext_bytes.decode(encoding, errors="replace")  def response_decode(cipher_str: str, server_key_hex: str, server_iv_hex: str) -> str:      first_decode = base64.b64decode(cipher_str)     second_decode = base64.b64decode(first_decode)     body_bytes = bytearray(second_decode)     aes_cipher_data = body_bytes[HMAC_LEN:]     key_bytes = binascii.unhexlify(server_key_hex)     iv_bytes = binascii.unhexlify(server_iv_hex)     cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)     decrypted = cipher.decrypt(aes_cipher_data)     decrypted = pkcs7_unpad(decrypted)     encoding = chardet.detect(decrypted).get("encoding"or"utf-8" return decrypted.decode(encoding, errors="replace")  def request_encode_tls13(plaintext_str: str, client_key_hex: str, client_iv_hex: str, hmac_key_hex: str,                          encoding='utf-8', serial_bytes: bytes = None, rnc_bytes: bytes = None) -> str: try:         import json         parsed = json.loads(plaintext_str)         plaintext_str = json.dumps(parsed, ensure_ascii=False, separators=(','':'))  # 紧凑格式 # 将 JSON 字符串字面量内部的等号替换为 \u003d         plaintext_str = escape_equals_in_json_strings(plaintext_str) print(f"使用JSON序列化后的字符串长度: {len(plaintext_str)}")     except: print("不是有效的JSON格式,保持原样") print(f"原始字符串长度: {len(plaintext_str)}")      plaintext_bytes = plaintext_str.encode(encoding) print(plaintext_str) print(f"编码为字节后长度: {len(plaintext_bytes)}")      rnc = rnc_bytes if (rnc_bytes is not None and len(rnc_bytes) == RNC_LEN) else os.urandom(RNC_LEN)     padded = pkcs7_pad(rnc + plaintext_bytes)      key_bytes = binascii.unhexlify(client_key_hex)     iv_bytes = binascii.unhexlify(client_iv_hex)     cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes)     body_cr_bytes = cipher.encrypt(padded)  # 复用最近一次解密得到的 serial,否则随机     serial = serial_bytes if (serial_bytes is not None and len(serial_bytes) == SERIAL_LEN) else os.urandom(SERIAL_LEN)  # HMAC-SHA1( serial || body_cr )     hmac_key = binascii.unhexlify(hmac_key_hex)     hmac_full = calc_hmac_java_style(serial + body_cr_bytes, hmac_key)     hmac_bytes = hmac_full[:HMAC_LEN]      packet = hmac_bytes + serial + body_cr_bytes      out = base64.b64encode(base64.b64encode(packet)).decode('ascii') return out  def calculate_signature(body: str, hmac_key_hex: str) -> str:     import base64, binascii, hmac, hashlib  try:         decoded_body = base64.b64decode(body)         hmac_key = binascii.unhexlify(hmac_key_hex)         signature = hmac.new(hmac_key, decoded_body, hashlib.sha1).digest()         signature_b64 = base64.b64encode(signature).decode('ascii') return signature_b64     except Exceptionas e: print(f"计算签名失败: {e}") return""  def update_signature_header(headers: str, signature: str) -> str: if not signature: return headers  if'X-Emp-Signature:' in headers: # 替换现有的签名         headers = re.sub(r'X-Emp-Signature:\s*[^\r\n]*', f'X-Emp-Signature: {signature}', headers) else: # 添加新的签名头         lines = headers.split('\n') if len(lines) > 0:             lines.insert(1, f'X-Emp-Signature: {signature}')             headers = '\n'.join(lines) return headers  @app.route("/decode", methods=["POST"]) def decode(): try: # 解析 body 和 dataHeaders(dataHeaders 是前端转发的原始请求头)         headers = request.form.get("dataHeaders")         body = request.form.get("dataBody")         reqresp = request.form.get('requestorresponse')  # 获取  requestorresponse 参数  可选   获取是请求还是响应包,需要勾选<请求响应不同加解密>按钮         keys = get_aes_keys()  if headers != None: # 开启了请求头加密 if reqresp == 'request':  # 请求包                 text = request_decode_tls13(body, keys["clientKey"], keys["clientIv"], keys["clientHmacKey"])                 parsed_json = json.loads(text)                 body = json.dumps(parsed_json, ensure_ascii=False, indent=2) return headers.strip() + "\r\n\r\n\r\n\r\n" + body # 返回值为固定格式,不可更改 必需必需必需,共四个\r\n  if reqresp == 'response':  # 响应包                 text = response_decode(body, keys["serverKey"], keys["serverIv"])                 parsed_json = json.loads(text)                 body = json.dumps(parsed_json, ensure_ascii=False, indent=2) return headers.strip() + "\r\n\r\n\r\n\r\n" + body  # 返回值为固定格式,不可更改 必需必需必需,共四个\r\n  return  body  # 返回值为固定格式,不可更改 必需必需必需      except Exceptionas e:         traceback.print_exc() return jsonify({"status""error""msg": str(e)}), 500  @app.route("/encode", methods=["POST"]) def encode(): try:         headers = request.form.get("dataHeaders")         body = request.form.get("dataBody")         reqresp = request.form.get('requestorresponse')         keys = get_aes_keys()         serial_bytes = LAST_DECODE_STATE.get("serial_bytes")         rnc_bytes = LAST_DECODE_STATE.get("rnc_bytes")         encoding = LAST_DECODE_STATE.get("encoding", DEFAULT_ENCODING) print("serial_bytes:", serial_bytes.hex()) print("rnc_bytes:", rnc_bytes.hex())          cipher_str = request_encode_tls13(             body,             keys["clientKey"],             keys["clientIv"],             keys["clientHmacKey"],             encoding=encoding,             serial_bytes=serial_bytes,             rnc_bytes=rnc_bytes         )  # 将cipher_str封装成JSON格式的字符串         cipher_json_str = json.dumps({"=": cipher_str}, ensure_ascii=False)  if headers is not None: # 计算并更新签名头             signature = calculate_signature(cipher_str, keys["clientHmacKey"])             headers = update_signature_header(headers, signature)             result = headers.strip() + "\r\n\r\n\r\n\r\n" + cipher_json_str else:             result = cipher_json_str  return result      except Exceptionas e:         traceback.print_exc() return jsonify({"status""error""msg": str(e)}), 500 if __name__ == '__main__':     app.config['JSON_AS_ASCII'] = False     app.run(host="0.0.0.0", port=5002, debug=True) 

最终效果

Burpsuite的各项功能均正常使用,在Proxy模块可通过autoDecoder板块查看明文数据包

自动草稿自动草稿

Repeater等模块均可正常使用

自动草稿

即可开始愉快的测试~


文章来源:奇安信攻防社区

本文来源奇安信攻防社区,经授权后由华盟君发布,观点不代表华盟网的立场,转载请联系原作者。

发表回复