Angr 反混淆对抗 OLLVM

参考是 https://bbs.kanxue.com/thread-286549.htm
以下都是对这位师傅文章的学习,做个记录

对抗控制流平坦化 FLA

Control Flow Flattening
其主要目标是打乱程序原有的控制流结构(如 if/else、for、while、switch 等),干扰静态分析

原始代码

1
2
3
4
5
6
7
8
void example(int x) {
if (x > 10) {
printf("A\n");
} else {
printf("B\n");
}
printf("C\n");
}

对应流程图

1
2
3
4
5
6
7
8
9
 [Start]
|
[x>10?]
/ \
[A] [B]
\ /
[C]
|
[End]

控制流平坦化的核心思想:
把原本的结构化控制流打散成多个基本块;用分发器和状态变量来控制基本块的执行顺序;去掉原有的直接跳转关系(比如 if/else 分支),改为通过修改状态变量 + switch/case 来控制跳转使

平坦化之后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void example(int x) {
int state = 0;
while (1) {
switch (state) {
case 0:
if (x > 10) state = 1;
else state = 2;
break;
case 1:
printf("A\n");
state = 3;
break;
case 2:
printf("B\n");
state = 3;
break;
case 3:
printf("C\n");
state = 4;
break;
case 4:
return;
}
}
}

对应流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
        [Start]
|
[state = 0]
|
[Dispatcher]
|
+---+---+
| |
[case 0] ...
|
[x > 10 ?]
/ \
[state=1] [state=2]
| |
v v
[case 1] [case 2]
[print A] [print B]
| |
+----+-----+
|
[state=3]
|
[case 3]
[print C]
|
[state=4]
|
[case 4]
[return]
|
[End]

实际遇到的控制流平坦化远比示例复杂
一般标准的控制流平坦化 CFG 如下

序言:函数开始执行的第一个基本块
主/子分发器:控制基本块执行顺序,序言的后继为主分发器
预处理器:其后继为主分发器
ret 块:函数出口,它是没有后继的
真实块:混淆前的基本块,包含实际的代码逻辑,其后继为预处理器

ps:有关前驱和后继

1
2
3
4
5
[A] -- AB 的前驱 --
|
[B]
|
[C] -- CB 的后继 --

处理思路:

  1. 找到所有真实块
  2. 找到真实块之间原本的跳转关系
  3. 重建控制流

找到所有真实块

  • 找到主分发器(循环头)
  • 找到预处理器(汇聚块)
  • 找到 ret 块
  • 找到混淆前的真实块

找循环头
使用 BFS 遍历 CFG,记录从起始块到每个块的路径,一旦在路径中发现某个块再次出现,说明有回路,这个块的起始地址就是一个循环头

1
2
3
4
ABCD
↑ |
└────┘
path = [A, B, C, D]

当后继再次遇到 C,就说明 D → C 是一条回路,因此 C 是一个循环头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def find_loop_head(start_ea):
loop_heads = set()
queue = deque() # BFS 队列
block = get_basic_block(start_ea) # 获取起始地址所在的基本块
queue.append((block,[]))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea) # 找到循环头
continue
path = path + [cur_block.start_ea] # 更新路径
queue.extend((s, path) for s in cur_block.succs()) # 将后继加入队列

all_loop_heads = list(loop_heads)
all_loop_heads.sort() # 升序排序,确保主循环头在第一个
return all_loop_heads

找汇聚块
对于标准的 FLA,循环头有两个前驱,一个是序言块,一个是汇聚块
示例的这个非标准的 FLA,其循环头和汇聚块的地址是相同的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_basic_block(loop_head_addr) # 循环头
preds = block.preds() # 获取前驱基本块
pred_list = list(preds)

if len(pred_list) == 2: # 标准 ollvm:循环头有两个前驱,一个序言块一个汇聚块
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) > 1: # 有多个前驱的块是汇聚块
converge_addr = pred.start_ea
else: # 非标准 ollvm:循环头只有一个前驱,需要继续向前找
converge_addr = loop_head_addr
return converge_addr

找真实块
找到汇聚块之后就可以通过寻找其前驱的方式找到混淆前的所有真实块了
对于非标准的 FLA,汇聚块的前驱包含了序言块
而对于标准的 FLA,获取序言块需要从循环头的前驱中寻找并剔除掉其中的汇聚块

除此之外,还要注意的一个点是,有的基本块的尾部指令是相同的,反编译器在处理时会把它拿出来共享,这种情况下就需要把这些块的前驱也考虑进去,否则找到的真实块会不完整
示例:函数 sub_41D08 (sub441D08)中的 0x42288(0x00442288) 处

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def find_all_real_blocks(fun_ea):
blocks = idaapi.FlowChart(idaapi.get_func(fun_ea))

# 获取循环头
loop_heads = find_loop_head(fun_ea)
print(f"循环头数量: {len(loop_heads)}----{[hex(x) for x in loop_heads]}")
all_real_blocks = []

#对每个循环头分析真实块
for loop_head_addr in loop_heads:
loop_head_block = get_basic_block(loop_head_addr)
loop_head_preds = list(loop_head_block.preds())
loop_head_preds_addr = [b.start_ea for b in loop_head_preds]

converge_addr = find_converge_addr(loop_head_addr)
real_blocks = []

# 从循环头的前驱里剔除汇聚块,剩下序言块
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)

# 分析汇聚块前驱,找真实块
converge_block = get_basic_block(converge_addr)
list_preds = list(converge_block.preds())
for pred in list_preds:
end_ea = pred.end_ea
last_inst_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_inst_ea)

size = get_block_size(pred)
if size > 4 and "B." not in mnem: # 大于单指令块且不是跳转指令
start_ea = pred.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL": # 处理条件选择指令
csel_preds = pred.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(start_ea)

real_blocks.sort() # 排序,第一个是序言块
all_real_blocks.append(real_blocks)
print("子循环头:", [hex(child_block_ea) for child_block_ea in real_blocks])

找 ret 块
对于标准的 FLA,一般直接找没有后继的块即可
但是对于非标准的 FLA,只关注没有后继的块会遗漏部分真实代码 (0x42AB0处)
除此之外,还要考虑单跳转块和子分发器的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[blockA]


[jmp_to_ret] ← 只包含 1 条跳转指令(长度4


[ret_block] ← 真正包含 RET 指令的块


[blockA]


[child_dispatcher1] ← 子分发器1


[child_dispatcher2] ← 子分发器2


[ret_block] ← 真正包含 RET 指令的块

这样使得最后返回的 block :
如果有单跳转->回溯到上一个真实块
如果有子分发器->回退到 ori_ret_block
否则就是一开始找到的 ret 块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import idaapi
import idc
from collections import deque # 用于 BFS 遍历

# 获取地址所在的基本块
def get_basic_block(ea):
func = idaapi.get_func(ea)
if not func:
return None
f = idaapi.FlowChart(func) # 获取函数的控制流图
for block in f:
if block.start_ea <= ea < block.end_ea:
return block
return None

# 查找循环头(主分发器)
def find_loop_head(start_ea):
loop_heads = set()
queue = deque() # BFS 队列
block = get_basic_block(start_ea) # 获取起始地址所在的基本块
queue.append((block,[]))
while len(queue) > 0:
cur_block, path = queue.popleft()
if cur_block.start_ea in path:
loop_heads.add(cur_block.start_ea) # 找到循环头
continue
path = path + [cur_block.start_ea] # 更新路径
queue.extend((s, path) for s in cur_block.succs()) # 将后继加入队列

all_loop_heads = list(loop_heads)
all_loop_heads.sort() # 升序排序,确保主循环头在第一个
return all_loop_heads

# 查找汇聚地址(预处理器)
def find_converge_addr(loop_head_addr):
converge_addr = None
block = get_basic_block(loop_head_addr) # 循环头
preds = block.preds() # 获取前驱基本块
pred_list = list(preds)

if len(pred_list) == 2: # 标准 ollvm:循环头有两个前驱,一个序言块一个汇聚块
for pred in pred_list:
tmp_list = list(pred.preds())
if len(tmp_list) > 1: # 有多个前驱的块是汇聚块
converge_addr = pred.start_ea
else: # 非标准 ollvm:循环头只有一个前驱,需要继续向前找
converge_addr = loop_head_addr
return converge_addr

# 获取基本块大小
def get_block_size(block):
return block.end_ea - block.start_ea

# 查找 ret 块地址
def find_ret_block(blocks):
for block in blocks:
succs = list(block.succs()) # 获取后继块
succs_list = list(succs)

end_ea = block.end_ea # end_ea 指向基本块最后一条指令的下一个地址
last_inst_ea = idc.prev_head(end_ea) # 获取基本块最后一条指令地址
mnem = idc.print_insn_mnem(last_inst_ea) # 获取指令助记符

if len(succs_list) == 0: # 没有后继块
if mnem == "RET": # 最后一条指令是 ret 指令
ori_ret_block = block

# 向上寻找更合适的 ret 块
while True:
tmp_block = block.preds()
pred_list = list(tmp_block)
if len(pred_list) == 1: # 只有一个前驱
block = pred_list[0]
if get_block_size(block) == 4: # 单指令块
continue
else:
break
else: # 多个前驱或者无前驱
break

# 处理子分发器情况
block2 = block
num = 0
i = 0
while True:
i += 1
succs_block = block2.succs()
for succ in succs_block:
child_succs = succ.succs()
succ_list = list(child_succs)
if len(succ_list) != 0:
block2 = succ
num += 1
if num > 2:
block = ori_ret_block
break
if i > 2:
break
return block.start_ea

# 最后返回的 block :
# 如果有单跳转->回溯到上一个真实块
# 如果有子分发器->回退到 ori_ret_block
# 否则就是一开始找到的 ret 块

# 查找所有真实块
def find_all_real_blocks(fun_ea):
blocks = idaapi.FlowChart(idaapi.get_func(fun_ea))

# 获取循环头
loop_heads = find_loop_head(fun_ea)
print(f"循环头数量: {len(loop_heads)}----{[hex(x) for x in loop_heads]}")
all_real_blocks = []

#对每个循环头分析真实块
for loop_head_addr in loop_heads:
loop_head_block = get_basic_block(loop_head_addr)
loop_head_preds = list(loop_head_block.preds())
loop_head_preds_addr = [b.start_ea for b in loop_head_preds]

converge_addr = find_converge_addr(loop_head_addr)
real_blocks = []

# 从循环头的前驱里剔除汇聚块,剩下序言块
if loop_head_addr != converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)

# 分析汇聚块前驱,找真实块
converge_block = get_basic_block(converge_addr)
list_preds = list(converge_block.preds())
for pred in list_preds:
end_ea = pred.end_ea
last_inst_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_inst_ea)

size = get_block_size(pred)
if size > 4 and "B." not in mnem: # 大于单指令块且不是跳转指令
start_ea = pred.start_ea
mnem = idc.print_insn_mnem(start_ea)
if mnem == "CSEL": # 处理条件选择指令
csel_preds = pred.preds()
for csel_pred in csel_preds:
real_blocks.append(csel_pred.start_ea)
else:
real_blocks.append(start_ea)

real_blocks.sort() # 排序,第一个是序言块
all_real_blocks.append(real_blocks)
print("子循环头:", [hex(child_block_ea) for child_block_ea in real_blocks])

# 添加返回块
ret_addr = find_ret_block(blocks)
all_real_blocks.append(ret_addr)
print("all_real_blocks:",all_real_blocks)

# 合并所有真实块地址
all_real_block_list = []
for real_blocks in all_real_blocks:
if isinstance(real_blocks,list):
all_real_block_list.extend(real_blocks)
else:
all_real_block_list.append(real_blocks)

print("\n所有真实块获取完成")
print(all_real_block_list)
print(f"真实块数量: {len(all_real_block_list)}")
print([hex(x) for x in all_real_block_list],"\n")

# 分析子序言块
all_child_prologue_addr = all_real_blocks.copy()
all_child_prologue_addr.remove(ret_addr)
all_child_prologue_addr.remove(all_child_prologue_addr[0]) # 移除主序言块
print("所有子序言块相关的真实块地址:", all_child_prologue_addr)

# 获取子序言块的最后指令地址
all_child_prologue_last_ins_ea = []
for child_prologue_array in all_child_prologue_addr:
child_prologue_addr = child_prologue_array[0]
child_prologue_block = get_basic_block(child_prologue_addr)
child_prologue_end_ea = child_prologue_block.end_ea
child_prologue_last_ins_ea = idc.prev_head(child_prologue_end_ea)
all_child_prologue_last_ins_ea.append(child_prologue_last_ins_ea)

print("所有子序言块的最后一条指令的地址:", all_child_prologue_last_ins_ea)

return all_real_blocks, all_child_prologue_addr, all_child_prologue_last_ins_ea


func_ea = 0x41D08
reals = find_all_real_blocks(func_ea)

输出:

1
2
3
4
5
6
7
8
9
10
11
12
循环头数量: 2----['0x41dbc', '0x42290']
子循环头: ['0x41d08', '0x41da0', '0x41e14', '0x41e70', '0x41f0c', '0x41f64', '0x41fac', '0x4200c', '0x42058', '0x420b4', '0x420fc', '0x4212c', '0x4213c', '0x42154', '0x421b0', '0x421c4', '0x421d8', '0x421f0', '0x42208', '0x4221c', '0x4223c', '0x42a48', '0x42a68', '0x42a80', '0x42a90']
子循环头: ['0x42258', '0x422f8', '0x42368', '0x423dc', '0x42454', '0x424b0', '0x42518', '0x42570', '0x425b8', '0x4262c', '0x4266c', '0x426c8', '0x42718', '0x42794', '0x427d8', '0x42808', '0x42860', '0x42870', '0x428a8', '0x428c0', '0x428e4', '0x42918', '0x42930', '0x42958', '0x4299c', '0x429bc', '0x429d8', '0x429e8', '0x42a04', '0x42a1c', '0x42a2c']
all_real_blocks: [[269576, 269728, 269844, 269936, 270092, 270180, 270252, 270348, 270424, 270516, 270588, 270636, 270652, 270676, 270768, 270788, 270808, 270832, 270856, 270876, 270908, 272968, 273000, 273024, 273040], [270936, 271096, 271208, 271324, 271444, 271536, 271640, 271728, 271800, 271916, 271980, 272072, 272152, 272276, 272344, 272392, 272480, 272496, 272552, 272576, 272612, 272664, 272688, 272728, 272796, 272828, 272856, 272872, 272900, 272924, 272940], 273072]

所有真实块获取完成
[269576, 269728, 269844, 269936, 270092, 270180, 270252, 270348, 270424, 270516, 270588, 270636, 270652, 270676, 270768, 270788, 270808, 270832, 270856, 270876, 270908, 272968, 273000, 273024, 273040, 270936, 271096, 271208, 271324, 271444, 271536, 271640, 271728, 271800, 271916, 271980, 272072, 272152, 272276, 272344, 272392, 272480, 272496, 272552, 272576, 272612, 272664, 272688, 272728, 272796, 272828, 272856, 272872, 272900, 272924, 272940, 273072]
真实块数量: 57
['0x41d08', '0x41da0', '0x41e14', '0x41e70', '0x41f0c', '0x41f64', '0x41fac', '0x4200c', '0x42058', '0x420b4', '0x420fc', '0x4212c', '0x4213c', '0x42154', '0x421b0', '0x421c4', '0x421d8', '0x421f0', '0x42208', '0x4221c', '0x4223c', '0x42a48', '0x42a68', '0x42a80', '0x42a90', '0x42258', '0x422f8', '0x42368', '0x423dc', '0x42454', '0x424b0', '0x42518', '0x42570', '0x425b8', '0x4262c', '0x4266c', '0x426c8', '0x42718', '0x42794', '0x427d8', '0x42808', '0x42860', '0x42870', '0x428a8', '0x428c0', '0x428e4', '0x42918', '0x42930', '0x42958', '0x4299c', '0x429bc', '0x429d8', '0x429e8', '0x42a04', '0x42a1c', '0x42a2c', '0x42ab0']

所有子序言块相关的真实块地址: [[270936, 271096, 271208, 271324, 271444, 271536, 271640, 271728, 271800, 271916, 271980, 272072, 272152, 272276, 272344, 272392, 272480, 272496, 272552, 272576, 272612, 272664, 272688, 272728, 272796, 272828, 272856, 272872, 272900, 272924, 272940]]
所有子序言块的最后一条指令的地址: [270980]

找到真实块之间的关系

由上述步骤我们已经找到了所有的真实块,接下来要做的就是找到这些真实块之间的跳转关系
这里就使用到了 Angr 来处理,为了避免一次性执行完整个程序发生路径爆炸,导致执行失败的问题,我们采用分块单独执行的方式,每次执行只取 real_blocks 中的一个块,如第一次取 real_blocks[0] 开始执行,当执行到的地址在保存的真实块地址列表中时,就停止执行,保存当前状态,记录下这个块连接关系;然后取下一个真实块 real_blocks[1] 作为新的初始状态继续执行

几个要点:

  • 用 proj.hook() hook 序言块的最后一条指令地址,让 angr 直接从真实块地址开始执行
  • 对于每个真实块,用 angr 的模拟器(simgr)单步执行,如果执行流跳转到的地址在 real_blocks 里就认为这是后继块
  • 对于符合的普通无条件跳转,直接记录;对于 CSEL 条件跳转,强制选择 CSEL 的某个分支结果,而不用依赖当前状态里的条件标志,分别模拟真/假两种情况

具体实现

  • CSEL 指令解析以及它的特殊处理机制
    csel x0, x1, x2, eq
1
2
3
4
5
6
7
def capstone_decode_csel(insn):
operands = insn.op_str.replace(" ", "").split(",")
dst_reg = operands[0] # 目标寄存器
true_reg = operands[1] # 条件为真时的寄存器
false_reg = operands[2] # 条件为假时的寄存器
condition = operands[3] # 条件
return dst_reg, true_reg, false_reg, condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def find_state_succ(proj, base, local_state, flag, real_blocks, real_block_addr, path):
# 仅在 find_block_succ 识别为 csel 时调用
ins = local_state.block().capstone.insns[0] # 获取当前块的第一条指令,假设是 csel
dst_reg, true_reg, false_reg, condition = capstone_decode_csel(ins)
val1 = local_state.regs.get(true_reg)
val2 = local_state.regs.get(false_reg)

sm = proj.factory.simgr(local_state)
sm.step(num_inst=1) # 单步执行一条指令
temp_state = sm.active[0]

# 强制选择 CSEL 的某个分支结果(不用依赖当前状态里的条件标志)
if flag:
setattr(temp_state.regs, dst_reg, val1) # 条件为真,设置目标寄存器为真值
else:
setattr(temp_state.regs, dst_reg, val2)

while(len(sm.active)):
for active_state in sm.active:
ins_offset = active_state.addr - base
if ins_offset in real_blocks:
value = path[real_block_addr]
if ins_offset not in value:
value.append(ins_offset)
return ins_offset
sm.step(num_inst=1)
  • 识别真实块的后继
    从初始化 state 出发,找到进入该 real_block 的 state,然后在该 block 上判定是无条件还是遇到 CSEL 需要分支处理,并把结果写入 path

主要流程:
创建一个主模拟器开始执行,遍历所有 active 状态寻找真实块
复制当前状态创建新模拟器用于探索后继块,同时单步执行一次避免状态混淆
无条件跳转的情况下,如果有多个路径同时执行到真实块,排除掉 ret 块的路径
CSEL 条件跳转的情况下,我们知道 angr 在默认执行时,会依据当前状态的条件标志决定走哪一边,但这里采用半符号执行,不依赖真实标志,所以复制一份当前状态(True 分支),强制设定 csel 结果为真分支,再复制一份(False 分支),强制设定为假分支,然后执行直到遇到属于 real_blocks 的真实块地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def find_block_succ(proj, base, func_offset, state, real_block_addr, real_blocks, path):
msm = proj.factory.simgr(state) # 构造模拟器

# 找到传入的真实块作为主块,再复制一份当前 state ,准备后继块获取的操作
while len(msm.active):
for active_state in msm.active:
offset = active_state.addr - base
if offset == real_block_addr: # 找到真实块
mstate = active_state.copy() # 复制state,为后继块的获取做准备
msm2 = proj.factory.simgr(mstate)
msm2.step(num_inst=1) # 让状态进到块内的下一条指令位置,避免和外层状态混淆

# 寻找真实块的所有后继块
while len(msm2.active):
for mactive_state in msm2.active:
ins_offset = mactive_state.addr - base
if ins_offset in real_blocks: # 无分支块(或无条件跳转)
# 在无条件跳转中,并且有至少两条路径同时执行到真实块时,取非 ret 块的真实块
msm2_len = len(msm2.active)
if msm2_len > 1:
tmp_addrs = []
for s in msm2.active:
moffset = s.addr - base
tmp_value = path[real_block_addr]
if moffset in real_blocks and moffset not in tmp_value:
tmp_addrs.append(moffset)
if len(tmp_addrs) > 1:
print("当前至少有两个路径同时执行到真实块:", [hex(tmp_addr) for tmp_addr in tmp_addrs])
ret_addr = real_blocks[len(real_blocks) - 1]
if ret_addr in tmp_addrs:
tmp_addrs.remove(ret_addr)
ins_offset = tmp_addrs[0]
print("两个路径同时执行到真实块最后取得:", hex(ins_offset))

value = path[real_block_addr]
if ins_offset not in value:
value.append(ins_offset)
print(f"无条件跳转块关系:{hex(real_block_addr)}-->{hex(ins_offset)}")
return

# csel 分支块
ins = mactive_state.block().capstone.insns[0]
if ins.mnemonic == 'csel':
state_true = mactive_state.copy()
state_true_succ_addr = find_state_succ(proj, base, state_true, True, real_blocks, real_block_addr, path)

state_false = mactive_state.copy()
state_false_succ_addr = find_state_succ(proj, base, state_false, False, real_blocks, real_block_addr, path)

if state_true_succ_addr is None or state_false_succ_addr is None:
print("csel错误指令地址:", hex(ins_offset))
print(f"csel后继有误:{hex(real_block_addr)}-->{hex(state_true_succ_addr) if state_true_succ_addr is not None else state_true_succ_addr},"
f"{hex(state_false_succ_addr) if state_false_succ_addr is not None else state_false_succ_addr}")
return "erro"

print(f"csel分支跳转块关系:{hex(real_block_addr)}-->{hex(state_true_succ_addr)},{hex(state_false_succ_addr)}")
return
msm2.step(num_inst=1)
# 真实块集合中的最后一个基本块如果最后没找到后继,说明是return块,直接返回
return
msm.step(num_inst=1)
  • 主流程
    初始化 angr
1
2
3
4
5
proj = angr.Project(file_path, auto_load_libs=False)  # 加载目标 so 文件
base = proj.loader.min_addr # 获取基地址
func_addr = base + func_offset # 计算函数实际地址
init_state = proj.factory.blank_state(addr=func_addr) # 创建初始执行状态
init_state.options.add(angr.options.CALLLESS) # 关闭函数调用模拟(不进入库函数),避免模拟到 libc 出问题

构造 path 字典,key 是每个真实块地址,value 是该块的后继真实块列表

1
2
path = {addr: [] for addr in real_blocks}
ret_addr = real_blocks[len(real_blocks) - 1]

取主序言的最后一条指令,后面要在这个位置 hook 来修改 PC 跳转,直接跳到某个真实块

1
2
3
first_block = proj.factory.block(func_addr)
first_block_insns = first_block.capstone.insns
first_block_last_ins = first_block_insns[len(first_block_insns) - 1]

遍历真实块地址,检查是否属于某个子序言块

1
2
3
4
5
6
7
8
9
10
11
for real_block_addr in tqdm(real_blocks):
if ret_addr == real_block_addr:
continue

prologue_block_addr = 0
child_prologue_last_ins_ea = 0
if len(all_child_prologue_addr) > 0:
for index, child_prologue_array in enumerate(all_child_prologue_addr):
if real_block_addr in child_prologue_array:
prologue_block_addr = child_prologue_array[0] + base
child_prologue_last_ins_ea = all_child_prologue_last_ins_ea[index]

hook 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def jump_to_address(state):
state.regs.pc = base + real_block_addr - 4 # ARM 架构指令长度 4

def jump_to_child_prologue_address(state):
state.regs.pc = prologue_block_addr - 4

if prologue_block_addr == 0:
# 没有子序言,直接在主序言最后一条指令处hook
if real_block_addr != func_offset:
proj.hook(first_block_last_ins.address, jump_to_address, first_block_last_ins.size)
else:
# 先跳到子序言,再从子序言的最后一条跳到真实块
proj.hook(first_block_last_ins.address, jump_to_child_prologue_address, first_block_last_ins.size)
proj.hook(child_prologue_last_ins_ea, jump_to_address, 4)

最后就是调用 find_block_succ 获取后继块并输出控制流关系

重建控制流

关键点:
对于条件分支块 (在 patch_list 中有两个后继块): 查找 CSEL 指令,然后使用 keystone 汇编器生成一条条件跳转指令 (B.cond) 和一条无条件跳转指令 (B),并用它们替换 CSEL 指令和块末尾的指令
对于无条件跳转块 (在 patch_list 中有一个后继块): 在该块的末尾打上一个无条件跳转 (B) 指令,使其直接跳转到正确的目标块
对于返回块 (在 patch_list 中没有后继块): 跳过,不进行任何修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from collections import deque

import ida_funcs
import idaapi
import idautils
import idc
import keystone

#初始化Ks arm64架构的so,模式:小端序
ks = keystone.Ks(keystone.KS_ARCH_ARM64, keystone.KS_MODE_LITTLE_ENDIAN)
def patch_ins_to_nop(ins):
size = idc.get_item_size(ins)
for i in range(size):
idc.patch_byte(ins + i,0x90)

def get_block_by_address(ea):
func = idaapi.get_func(ea)
blocks = idaapi.FlowChart(func)
for block in blocks:
if block.start_ea <= ea < block.end_ea:
return block
return None



def patch_branch(patch_list):

for ea in patch_list:
values = patch_list[ea]
if len(values) == 0:#如果后继块为0,基本都是return块,不需要patch,直接跳过
continue
block = get_block_by_address(int(ea, 16))
start_ea = block.start_ea
end_ea = block.end_ea
last_ins_ea = idc.prev_head(end_ea)#因为block.end_ea获取的地址是块最后一个地址的下一个地址,所以需要向上取一个地址
if len(values) == 2:#分支块的patch
flag = False
for ins in idautils.Heads(start_ea,end_ea):#获取指定范围内的所有指令
if idc.print_insn_mnem(ins) == "CSEL":
condition = idc.print_operand(ins,3)
encoding, count = ks.asm(f'B.{condition} {values[0]}',ins)#生成CSEL指令处patch的汇编
encoding2, count2 = ks.asm(f'B {values[1]}', last_ins_ea)#生成块最后一个地址指令处patch的汇编
for i in range(4):
idc.patch_byte(ins+ i, encoding[i])
for i in range(4):
idc.patch_byte(last_ins_ea + i, encoding2[i])
flag = True
if not flag:#如果在有分支跳转的情况下没有找到CSEL指令,就要在当前基本块的最后两条指令做处理。此基本块的下一条指令就是csel
ins = idc.prev_head(last_ins_ea)
succs = block.succs()
succs_list = list(succs)
csel_ea = succs_list[0].start_ea
condition = idc.print_operand(csel_ea, 3)#获取csel指令的条件判断
encoding, count = ks.asm(f'B.{condition} {values[0]}', ins) # 生成CSEL指令处patch的汇编
encoding2, count2 = ks.asm(f'B {values[1]}', last_ins_ea) # 生成块最后一个地址指令处patch的汇编
try:
for i in range(4):
idc.patch_byte(ins + i, encoding[i])
for i in range(4):
idc.patch_byte(last_ins_ea + i, encoding2[i])
except :
print("except")

else:#无分支块的patch
encoding, count = ks.asm(f'B {values[0]}', last_ins_ea)
for i in range(4):
idc.patch_byte(last_ins_ea + i, encoding[i])
print("pach over!!!")

def find_all_useless_block(func_ea,real_blocks):
blocks = idaapi.FlowChart(idaapi.get_func(func_ea))
local_real_blocks = real_blocks.copy()
useless_blocks = []
ret_block_addr = local_real_blocks[len(local_real_blocks)-1]
queue = deque()
ret_block = get_block_by_address(ret_block_addr)
queue.append(ret_block)
while len(queue) > 0:#处理ret块相关的后继块
cur_block= queue.popleft()
queue.extend(succ for succ in cur_block.succs())
ret_flag = False
for succ in cur_block.succs():
local_real_blocks.append(succ.start_ea)
end_ea = succ.end_ea
last_ins_ea = idc.prev_head(end_ea)
mnem = idc.print_insn_mnem(last_ins_ea)
if mnem == "RET":
ret_flag = True
if ret_flag:
break
# local_real_blocks.extend(succ.start_ea for succ in cur_block.succs())
for block in blocks:
start_ea = block.start_ea
if start_ea not in local_real_blocks:
useless_blocks.append(start_ea)
print("所有的无用块:",[hex(b)for b in useless_blocks])
return useless_blocks

def patch_useless_blocks(func_ea,real_blocks):
useless_blocks = find_all_useless_block(func_ea, real_blocks)
# print(useless_blocks)
for useless_block_addr in useless_blocks:
block = get_block_by_address(useless_block_addr)
start_ea = block.start_ea
end_ea = block.end_ea

insns = idautils.Heads(start_ea, end_ea)
for ins in insns:
patch_ins_to_nop(ins)
print("无用块nop完成")

patch_list ={'0x41d08': ['0x4221c'], '0x41da0': ['0x421c4', '0x42258'], '0x41e14': ['0x41fac'], '0x41e70': ['0x4213c'], '0x41f0c': ['0x42a80', '0x4200c'], '0x41f64': ['0x42ab0'], '0x41fac': ['0x42a80', '0x421d8'], '0x4200c': ['0x42a90'], '0x42058': ['0x41f64', '0x42ab0'], '0x420b4': ['0x41e70', '0x42208'], '0x420fc': ['0x42ab0', '0x421f0'], '0x4212c': ['0x42a68'], '0x4213c': ['0x4212c', '0x42ab0'], '0x42154': ['0x420b4'], '0x421b0': ['0x41fac'], '0x421c4': ['0x41f0c'], '0x421d8': ['0x42258', '0x42ab0'], '0x421f0': ['0x42ab0', '0x4223c'], '0x42208': ['0x4213c'], '0x4221c': ['0x42154', '0x4212c'], '0x4223c': ['0x41da0'], '0x42a48': ['0x42058'], '0x42a68': ['0x420fc', '0x42ab0'], '0x42a80': ['0x421d8'], '0x42a90': ['0x421b0', '0x41e14'], '0x42258': ['0x42a1c'], '0x422f8': ['0x42a48'], '0x42368': ['0x4266c'], '0x423dc': ['0x42368'], '0x42454': ['0x429d8', '0x426c8'], '0x424b0': ['0x42570', '0x42860'], '0x42518': ['0x42454'], '0x42570': ['0x42a04'], '0x425b8': ['0x428e4'], '0x4262c': ['0x429d8'], '0x4266c': ['0x42930', '0x428a8'], '0x426c8': ['0x42a2c'], '0x42718': ['0x428c0'], '0x42794': ['0x428a8', '0x42718'], '0x427d8': ['0x42918', '0x422f8'], '0x42808': ['0x427d8', '0x423dc'], '0x42860': ['0x42870'], '0x42870': ['0x42808', '0x42a48'], '0x428a8': ['0x427d8'], '0x428c0': ['0x4266c'], '0x428e4': ['0x42958', '0x42808'], '0x42918': ['0x429e8'], '0x42930': ['0x42794'], '0x42958': ['0x424b0'], '0x4299c': ['0x425b8'], '0x429bc': ['0x4299c'], '0x429d8': ['0x42a48'], '0x429e8': ['0x42518'], '0x42a04': ['0x42860'], '0x42a1c': ['0x429bc'], '0x42a2c': ['0x4262c', '0x429d8'], '0x42ab0': []}

patch_branch(patch_list)


func_ea =0x41D08
real_blocks = [269576, 269728, 269844, 269936, 270092, 270180, 270252, 270348, 270424, 270516, 270588, 270636, 270652, 270676, 270768, 270788, 270808, 270832, 270856, 270876, 270908, 272968, 273000, 273024, 273040, 270936, 271096, 271208, 271324, 271444, 271536, 271640, 271728, 271800, 271916, 271980, 272072, 272152, 272276, 272344, 272392, 272480, 272496, 272552, 272576, 272612, 272664, 272688, 272728, 272796, 272828, 272856, 272872, 272900, 272924, 272940, 273072]
patch_useless_blocks(func_ea,real_blocks)
ida_funcs.reanalyze_function(ida_funcs.get_func(func_ea))#刷新函数控制流图
print("控制流图已刷新")

Angr 反混淆对抗 OLLVM
http://example.com/2025/10/05/angr_ollvm/
作者
Eleven
发布于
2025年10月5日
许可协议