Rebuilding a cleaned & working binary (Re150 part 2)

This article is the last part of the analysis of the Re150 GreHack 2015 challenge. It will focus on how to re-assemble a cleaned up version of this challenge using Miasm.

For references, please have a look at GreHack 2015 Re150 challenge: as painless as possible.

This analysis is based on Miasm revision 4eceb2b.

Retrieving all basic blocks

In the previous article, a recurrent deciphering pattern has been identified. As we first want to retrieve every basic block (in order to clean and re-assemble them), we need to pass through these patterns.

The methodology we used before, using the result of an execution given a specific input, has at least two drawbacks:
  • only one execution path is considered and then deciphered;
  • a valid input to reach this path is needed.

We could find several inputs (actually, three) which trigger different execution paths, execute them and merge resulting control flow graphes.

To introduce other Miasm APIs, and because this solution looks better in my mind, we will instead instrument the disassembly engine to partially execute deciphering patterns as they appear.

This way, new patterns will appear and will be “resolved” on the fly. In addition, if two branches contain the pattern, they will both be explored - there is no need for a specific input.

Disassembly callback

We will base our script on the standard, already detailed, disassembly code snippet.

from miasm2.analysis.binary import Container
from miasm2.analysis.machine import Machine

cont = Container.from_stream(open("dump.bin"))
bin_stream = cont.bin_stream
entry_point = cont.entry_point

machine = Machine(cont.arch)
mdis = machine.dis_engine(bin_stream)
blocks = mdis.dis_multibloc(cont.entry_point)
open("cfg.dot", "w").write(blocks.dot())

In order to instrument the disassembly engine, we will register a callback. This mechanism is illustrated in the example example/disasm/callback.py.

Basically, a method cb_decrypt is created. Its signature is composed of optional arguments:
  • mn: mnemonic instance for the current architecture (the one of machine.mn)
  • attrib: current attribute of the architecture, for instance 32 in x86_32
  • pool_bin: bin_stream of the binary being disassembled
  • cur_bloc: the asm_bloc frehly disassembled
  • offsets_to_dis: set of addresses to disassemble after the current block
  • symbol_pool: contains all symbols created, as block labels

The callback is then registered and automatically called with:

mdis.dis_bloc_callback = cb_decrypt
blocks = mdis.dis_multibloc(entry_point)

Pattern detection on the fly

We now have to detect & execute the aforementioned pattern. As we work with the disassembly engine, one block at a time, we will:
  1. detect the beginning and the end of the pattern when reaching its head block
  2. execute the pattern when its end block have been disassembled

For this purpose, we declare the global to_exec, a dictionary associating the pattern’s end block address with the corresponding head block address.

The head pattern detection is done with the following heuristic :

# Detect pattern head
if not (len(cur_bloc.lines) >= 5 and
        cur_bloc.lines[0].name == "PUSH" and
        cur_bloc.lines[1].name == "MOV" and
        cur_bloc.lines[-1].name.startswith("J") and
        cur_bloc.lines[-1].name != "JMP"):
    return

# Detection of the pattern, remember the start addr
to_exec[cur_bloc.get_next()] = cur_bloc.label.offset

Indeed, the first block is 5-line long. Before being the one we know, it needs to be split due to its final jump inside itself (see blocks.apply_splitting method for more details). That’s why the first time we encounter it, it is 5-line long and its direct successor (next constraint, the address when the jump is ignored) is the pattern’s end block.

Thus, we can register it in to_exec.

Pattern execution

To execute the pattern, we use an ELF Sandbox.

First, we need to link our fresh work with the Sandbox, in order to work on the same memory instance.

To this end, we just patch the above disassembly code snippet to use the Sandbox bin_stream:

from miasm2.analysis.sandbox import Sandbox_Linux_x86_32

# Parse arguments
parser = Sandbox_Linux_x86_32.parser(description="ELF sandboxer")
parser.add_argument("filename", help="ELF Filename")
options = parser.parse_args()

# Create sandbox
sb = Sandbox_Linux_x86_32(options.filename, options, globals())

machine = sb.machine
bin_stream = sb.jitter.bs
entry_point = sb.entry_point

mdis = machine.dis_engine(bin_stream)
to_exec = {}
def cb_decrypt(cur_bloc, offsets_to_dis, *args, **kwargs):
      ...

mdis.dis_bloc_callback = cb_decrypt
blocks = mdis.dis_multibloc(entry_point)
Now, we use the sb instance to run each detected pattern, on the fly (otherwise, the disassembly engine will need to be re-called to detect further patterns, which is very inefficient):
  1. we place a breakpoint stopping the execution at the end of the pattern
  2. we start the emulation from the pattern’s head address
def cb_decrypt(cur_bloc, offsets_to_dis, *args, **kwargs):
    start_addr = to_exec.get(cur_bloc.label, None)
    if start_addr:
        # Pattern detected, exec it
        print "Match on", hex(start_addr)
        # Reset exception, in case of disassembly error
        sb.jitter.vm.set_exception(0)
        # Place a breakpoint at the end of the pattern
        bp_addr = cur_bloc.lines[-1].offset
        sb.jitter.add_breakpoint(bp_addr, lambda x: False)
        # Run the emulation from the head
        sb.run(start_addr)
        # Sanity check
        assert sb.jitter.pc == bp_addr
    ...

If we launch the script:

$ python -i get_all_blocks.py reverseMe

Match on 0x8048e78
Match on 0x8048a45
Match on 0x8048e91
Match on 0x8048f0e
Match on 0x8049774
Match on 0x804a025
Match on 0x8049085
...
>>> len(blocks)
# Number of block disassembled
473
>>> len(to_exec)
# Number of pattern detected
152
>>> len(blocks) - (len(to_exec) * 3)
# Number of relevant block
17

Exit detection

Looking at the result, one can note a few weird blocks at the end, as illustrated on the following graph:

../../../_images/re150_rebuild_truncated_cfg.svg

CFG (truncated) of reverseMe main, with pattern executed

For instance, there is jump to 0xf51a21ad. Actually, the disassembly engine should stop after the block 0x804b270: it’s a syscall to sys_exit.

We can tell the engine to stop when an INT 0x80 is hit, but doing this, we will actually stop after an earlier call to sys_write.

We will then modify our callback to get back the interruption number (in EAX), and stop only on sys_exit (1). To do this, we will symbolically execute each block ending on an INT:

if cur_bloc.lines and cur_bloc.lines[-1].name == "INT":
     # In case of exit interruption, do not continue to disasm
     ## Symbol exec the current block to get EAX value
     ira = machine.ira()
     ira.add_bloc(cur_bloc)
     symb = symbexec(ira,
                     ira.arch.regs.regs_init)
     symb.emul_ir_bloc(ira, cur_bloc.label.offset)
     intnb = symb.symbols[ira.arch.regs.EAX]
     if int(intnb.arg) == 1:
         # sys_exit, do not disassemble successors
         cur_bloc.bto.clear()
         offsets_to_dis.clear()

Fixes and cleaning

At this stage, we have the full CFG, without any extra blocks. But we still have our patterns.

As before (see part 1), we will use MatchGraph to remove the patterns. But this time, we also patch jump offsets to keep assembler consistency: when linking a pattern’s predecessors with its successors, if the predecessor ends with a jump, set its destination to the successor label.

# sol, matcher, ... come from part 1
def block_merge(dgs, graph):
    """Remove the pattern defined in matcher
    @dgs: instance of DiGraphSimplifier (for recursive call purpose)
    @graph: graph to simplify
    """
    for sol in matcher.match(graph):
        successors = graph.successors(sol[end])
        for pred in graph.predecessors(sol[dad]):
            for succ in successors:
                constraint = graph.edges2constraint[(pred, sol[dad])]
                # Patch jump
                last_instr = pred.lines[-1]
                if last_instr.breakflow() and last_instr.dstflow():
                    ## Expr are immutable, so build a new arg list
                    new_arg = []
                    for arg in last_instr.args:
                        if expr_is_label(arg):
                            # Link pred -> succ, the new destination
                            new_arg.append(ExprId(succ.label, arg.size))
                        else:
                            new_arg.append(arg)
                    last_instr.args = new_arg
                # Update links
                graph.add_edge(pred, succ, constraint)

        # Remove matched blocks (and associated edges) from the graph
        for node in sol.itervalues():
            graph.del_node(node)
As in part 1, we also apply the bbl_simplifier pass to merge spaghetti blocks. We end with a cleaned up CFG, very similar to the one of part 1 but this time:
  • it is complete, ie. every path is represented;
  • jump offsets are valid, effectively pointing to their successors.
../../../_images/re150_rebuild_cleanedcfg.svg

Fixed and cleaned up CFG of reverseMe main

Re-assembling

Let’s rebuild a cleaned up and working binary!

To re-assemble, we will need “usable space” in the binary, that is to say a place to land. Two solutions:
  • we can add a custom section containing our code, and modify the entry point to jump on it;
  • as we do not care anymore about the previous code, we can override it.

To illustrate the Miasm API, we arbitrarily choose the second option.

Address intervals used by overridable blocks can be obtained through block.get_range(). We use a Miasm utils, miasm2.core.interval::interval, which stands for a set of non-overlapping integer interval.

>>> from miasm2.core.interval import interval
>>> interval(blk.get_range() for blk in blocks)
[0x8048081 0x8048EA6] U [0x8048EAA 0x8049EC3] U [0x8049EC7 0x804B089] U [0x804B08D 0x804B109] U [0x804B110 0x804B2D3] U [0x804B2E0 0x804B2FA]
>>> interval([dinterval.hull()])
[0x8048081 0x804B2FA]

There are a few gaps between our range, composed of useless bytes: we will use the hull of these intervals.

Now we have the range, we need to fill it with our blocks. We have a few constraints:
  • the entry block must be at the entry point address
  • blocks with a next constraint (as the one following a JNZ) between them must be concatenated in the resulting assembly

Fortunately, Miasm knows how to deal with this kind of constraints.

Think about a board with clusters of sticky-notes. You can move a whole cluster only by moving one of the sticky notes (-> pinning it to an absolute coordinate) and the rest of notes will follow accordingly (-> “next” constraints).

As with sticky nodes, we then “un-pin” every block and “re-pin” only the head to the entry point offset:

head = blocks_final.heads()[0]
for label in mdis.symbol_pool.items:
    mdis.symbol_pool.del_label_offset(label)
mdis.symbol_pool.set_offset(head.label, entry_point)

We are now able to assemble our code:

from miasm2.core.asmbloc import asm_resolve_final

# blocks_final is our cleaned and fixed CFG
patches = asm_resolve_final(mdis.arch, blocks_final, mdis.symbol_pool,
                            dst_interval=dinterval_full)

The resulting patches dictionary associates an offset to the corresponding assembled bytes. For instance, patches[0x8048081] = '\xbb\x01\x00\x00\x00'.

Finally, we copy reverseMe in a new reverseMe_clean.bin file, that we patch according to patches. We also clean up no longer needed bytes with zeros. The difference between the file offset and the corresponding virtual address is 0x8048000 (thanks to Elfesteem), so we end with:

new_filename = "%s_clean.bin" % options.filename
shutil.copy(options.filename, new_filename)

# Use Elfesteem to translate the address to the corresponding offset
section_ep = sb.elf.getsectionbyvad(entry_point).sh
offset_start = section_ep.addr - section_ep.offset

with open(new_filename, "r+") as fdesc:
    # Clean older values
    for start, stop in dinterval.intervals:
        ## Calc offset
        fdesc.seek(start - offset_start)
        fdesc.write("\x00" * (stop - start + 1))

    # Write new values
    for offset, data in patches.iteritems():
        fdesc.seek(offset - offset_start)
        fdesc.write(data)

And TADA!

A simpler binary to analyze (close to the original one?):

../../../_images/re150_rebuild_final_ida.png

CFG of reverseMe_clean.bin main in IDA, with strings and easily-catchable flag

And still working!

$ ./reverseMe
Format
$ ./reverseMe_clean.bin
Format
$ ./reverseMe "You know nothing, JS"
Wrong!
$ ./reverseMe_clean.bin "You know nothing, JS"
Wrong!
$ ./reverseMe 'Alph4t3stf0rc3mille!!!!!!!!!'
Well done!
$ ./reverseMe_clean.bin 'Alph4t3stf0rc3mille!!!!!!!!!'
Well done!

Bonus

Without going to deep in the details, we can easily ask Miasm to solve the challenge for us using Microsoft z3, a SMT solver.

A future article will be fully dedicated to interaction between Miasm and SMT solvers, so this is only a teaser.

Our script will try to reach every basic block at least one time, and generate valid input for each of them. This is similar (and way more naive) to what Triton / KLEE / Angr / … are doing. A script doing this is already present in examples (expression/solve_condition_stp.py) but here, we write a simpler version based on z3.

First, we need a symbolic execution engine which stop on sys_exit. We re-use the same idea than before in a sub-class of symbexec:

class SbExecStopINT(symbexec):

    def emul_ir_blocs(self, myir, ad, lbl_stop=None, step=False):
        while True:
            # Specific code
            if (isinstance(self.symbols[myir.arch.regs.interrupt_num], ExprInt) and
                int(self.symbols[myir.arch.regs.interrupt_num].arg) == 0x80):
                return False
            ################
            b = myir.get_bloc(ad)
            if b is None:
                break
            if b.label == lbl_stop:
                break
            ad = self.emulbloc(b, step=step)
        return ad

We also need the common pattern to load a binary:

cont = Container.from_stream(open(sys.argv[1]))
machine = Machine(cont.arch)
mdis = machine.dis_engine(cont.bin_stream)
blocks = mdis.dis_multibloc(cont.entry_point)

# Translation to Miasm IR
ira = machine.ira()
for block in blocks:
      ira.add_bloc(block)
Then, we define a state as:
  • the address of the current block
  • a SbExecStopINT instance with current symbol values
  • a set of constraints on these symbols to reach the current block
State = namedtuple("State", ["addr", "symbexec", "constraints"])

Our algorithm will take a state in input, check its satisfiability and generate n states for each of available successors.

The initial state is the state at the entry point. We define a custom ExprId, arg1, which is an alias on the input (for readability).

arg1 = ExprId("arg1")

symb = SbExecStopINT(ira, ira.arch.regs.regs_init)
# arg1 = @32[ESP + 8]
symb.symbols[ExprMem(ExprId("ESP_init") + ExprInt32(8), 32)] = arg1
# Initial state, at entry without any constraint
# Implementation detail: frozenset are used to have hashable States
init_state = State(cont.entry_point, symb, frozenset())

The magic will be operating through translators (miasm2.ir.translators, example/expression/expr_translate.py) which translate a Miasm IR element to something.

Here, we obtain a translator for z3:

from miasm2.ir.translators import Translator
trans = Translator.to_language("z3")

We can now write our main algorithm. First, here is a worklist code skeleton:

init_state = State(cont.entry_point, symb, frozenset())
todo = set([init_state])
done = set()

while todo:
    state = todo.pop()
    if state in done:
        continue
    done.add(state)

Then, we check the reachability of the state:

# Check reachability
print "Check %s" % state.addr
solver = z3.Solver()
for cons in state.constraints:
    solver.add(trans.from_expr(cons))
if solver.check().r == z3.unsat:
    # Unreachable
    continue

Then, we execute the current block (working on a copy of symbols):

sbcpy = SbExecStopINT(ira, {})
sbcpy.symbols = state.symbexec.symbols.copy()

addr = sbcpy.emul_ir_blocs(ira, state.addr)

If we reached a sys_exit, SbExecStopINT will return False. This is maybe the expected solution.

if addr is False:
    check_for_solution(sbcpy)
    continue

Otherwise, we can continue on our successors. Their addresses and the local constraints associated are obtained with possible_values (for more detail, PR #337). We add them to the constraints of the current state:

for out in possible_values(addr):
    constraints = set(state.constraints)
    constraints.update(constraint.to_constraint()
                       for constraint in out.constraints)
    todo.add(State(out.value, sbcpy, frozenset(constraints)))

Finally, we just have to write check_for_solution:

def get_char(model, idx, trans):
    """Retrieve the input character number @idx of solution @model"""
    addr = ExprMem(arg1 + ExprInt32(idx), 8)
    return chr(model.eval(trans.from_expr(addr)).as_long())

def check_for_solution(sbcpy):
    # Get the message printed by sys_write
    msg_addr = int(sbcpy.symbols[ira.arch.regs.ECX].arg)
    lgth = int(sbcpy.symbols[ira.arch.regs.EDX].arg)
    msg = cont.executable.virt.get(msg_addr, msg_addr + lgth).strip()
    print "REACH INT, message: %s" % msg

    if msg_addr == 0x804B30C:
        # Solution found if message is "Well done!"
        print "SOLUTION FOUND:",

        # Model the solution
        mod = solver.model()

        # Print solution
        cur_char, idx, out = get_char(mod, 0, trans), 0, []
        while cur_char != "\x00":
            out.append(cur_char)
            idx += 1
            cur_char = get_char(mod, idx, trans)
        print "".join(out)

And TADA (again)!

$ time python find_all.py reverseMe_clean.bin
Check 134516344
Check 0x8048E82
Check 0x8048099
REACH INT, message: Wrong!
...
REACH INT, message: Format
...
Check 0x8048F27
REACH INT, message: Well done!
SOLUTION FOUND: Alph4t3stf0rc3mille!!!!!!!!!

real        0m1.767s
user        0m1.712s
sys         0m0.052s

Conclusion

Through this article, we have used Miasm to deobfuscate a (simple) binary to a new one, fixed and still working.

As this is only a toy example, one can easily imagine applying this kind of algorithm on real world cases, to simplify a binary for further analysis. That way, this step is “independent”, outputting a working binary processable by other tools.

In addition, as Miasm doesn’t rely on external tool for disassembling / assembling, these processes are applicable on custom architectures.

One can also imagine writing a tiny packer based on Miasm.

Finally, we just start to play with SMT solvers; there are very powerful (and fashionable) tools, so be sure we will use them again!

Final scripts: