cea-sec/miasm

View on GitHub
example/expression/solve_condition_stp.py

Summary

Maintainability
B
6 hrs
Test Coverage
from __future__ import print_function

import subprocess
import sys
from optparse import OptionParser

from future.utils import viewitems

from miasm.analysis.binary import Container
from miasm.analysis.machine import Machine
from miasm.core import parse_asm
from miasm.core.locationdb import LocationDB
from miasm.expression.expression import ExprInt, ExprCond, ExprId, \
    get_expr_ids, ExprAssign, ExprLoc
from miasm.expression.simplifications import expr_simp
from miasm.ir.symbexec import SymbolicExecutionEngine, get_block
from miasm.ir.translators.translator import Translator

machine = Machine("x86_32")


parser = OptionParser(usage="usage: %prog [options] file")
parser.add_option('-a', "--address", dest="address", metavar="ADDRESS",
                  help="address to disasemble", default="0")

(options, args) = parser.parse_args(sys.argv[1:])
if not args:
    parser.print_help()
    sys.exit(0)


def emul_symb(lifter, ircfg, mdis, states_todo, states_done):
    while states_todo:
        addr, symbols, conds = states_todo.pop()
        print('*' * 40, "addr", addr, '*' * 40)
        if (addr, symbols, conds) in states_done:
            print('Known state, skipping', addr)
            continue
        states_done.add((addr, symbols, conds))
        symbexec = SymbolicExecutionEngine(lifter)
        symbexec.symbols = symbols.copy()
        if lifter.pc in symbexec.symbols:
            del symbexec.symbols[lifter.pc]
        irblock = get_block(lifter, ircfg, mdis, addr)

        print('Run block:')
        print(irblock)
        addr = symbexec.eval_updt_irblock(irblock)
        print('Final state:')
        symbexec.dump(mems=False)

        assert addr is not None
        if isinstance(addr, ExprCond):
            # Create 2 states, each including complementary conditions
            cond_group_a = {addr.cond: ExprInt(0, addr.cond.size)}
            cond_group_b = {addr.cond: ExprInt(1, addr.cond.size)}
            addr_a = expr_simp(symbexec.eval_expr(addr.replace_expr(cond_group_a), {}))
            addr_b = expr_simp(symbexec.eval_expr(addr.replace_expr(cond_group_b), {}))
            if not (addr_a.is_int() or addr_a.is_loc() and
                    addr_b.is_int() or addr_b.is_loc()):
                print(str(addr_a), str(addr_b))
                raise ValueError("Unsupported condition")
            if isinstance(addr_a, ExprInt):
                addr_a = int(addr_a.arg)
            if isinstance(addr_b, ExprInt):
                addr_b = int(addr_b.arg)
            states_todo.add((addr_a, symbexec.symbols.copy(), tuple(list(conds) + list(viewitems(cond_group_a)))))
            states_todo.add((addr_b, symbexec.symbols.copy(), tuple(list(conds) + list(viewitems(cond_group_b)))))
        elif addr == ret_addr:
            print('Return address reached')
            continue
        elif addr.is_int():
            addr = int(addr.arg)
            states_todo.add((addr, symbexec.symbols.copy(), tuple(conds)))
        elif addr.is_loc():
            states_todo.add((addr, symbexec.symbols.copy(), tuple(conds)))
        else:
            raise ValueError("Unsupported destination")


if __name__ == '__main__':
    loc_db = LocationDB()
    translator_smt2 = Translator.to_language("smt2")

    addr = int(options.address, 16)

    cont = Container.from_stream(open(args[0], 'rb'), loc_db)
    mdis = machine.dis_engine(cont.bin_stream, loc_db=loc_db)
    lifter = machine.lifter(mdis.loc_db)
    ircfg = lifter.new_ircfg()
    symbexec = SymbolicExecutionEngine(lifter)

    asmcfg = parse_asm.parse_txt(
        machine.mn, 32, '''
    init:
    PUSH argv
    PUSH argc
    PUSH ret_addr
    ''',
        loc_db
    )


    argc_lbl = loc_db.get_name_location('argc')
    argv_lbl = loc_db.get_name_location('argv')
    ret_addr_lbl = loc_db.get_name_location('ret_addr')
    init_lbl = loc_db.get_name_location('init')

    argc_loc = ExprLoc(argc_lbl, 32)
    argv_loc = ExprLoc(argv_lbl, 32)
    ret_addr_loc = ExprLoc(ret_addr_lbl, 32)


    ret_addr = ExprId("ret_addr", ret_addr_loc.size)

    fix_args = {
        argc_loc: ExprId("argc", argc_loc.size),
        argv_loc: ExprId("argv", argv_loc.size),
        ret_addr_loc: ret_addr,
    }



    block = asmcfg.loc_key_to_block(init_lbl)
    for instr in block.lines:
        for i, arg in enumerate(instr.args):
            instr.args[i]= arg.replace_expr(fix_args)
    print(block)

    # add fake address and len to parsed instructions
    lifter.add_asmblock_to_ircfg(block, ircfg)
    irb = ircfg.blocks[init_lbl]
    symbexec.eval_updt_irblock(irb)
    symbexec.dump(ids=False)
    # reset lifter blocks
    lifter.blocks = {}

    states_todo = set()
    states_done = set()
    states_todo.add((addr, symbexec.symbols, ()))

    # emul blocks, propagate states
    emul_symb(lifter, ircfg, mdis, states_todo, states_done)

    all_info = []

    print('*' * 40, 'conditions to match', '*' * 40)
    for addr, symbols, conds in sorted(states_done, key=str):
        print('*' * 40, addr, '*' * 40)
        reqs = []
        for k, v in conds:
            print(k, v)
            reqs.append((k, v))
        all_info.append((addr, reqs))

    all_cases = set()

    symbexec = SymbolicExecutionEngine(lifter)
    for addr, reqs_cond in all_info:
        out = ['(set-logic QF_ABV)',
               '(set-info :smt-lib-version 2.0)']

        conditions = []
        all_ids = set()
        for expr, value in reqs_cond:

            all_ids.update(get_expr_ids(expr))
            expr_test = ExprCond(expr,
                                 ExprInt(1, value.size),
                                 ExprInt(0, value.size))
            cond = translator_smt2.from_expr(ExprAssign(expr_test, value))
            conditions.append(cond)

        for name in all_ids:
            out.append("(declare-fun %s () (_ BitVec %d))" % (name, name.size))
        if not out:
            continue

        out += conditions
        out.append('(check-sat)')
        open('out.dot', 'w').write('\n'.join(out))
        try:
            cases = subprocess.check_output(["stp",
                                             "-p", '--SMTLIB2',
                                             "out.dot"])
        except OSError as e:
            print("Cannot execute 'stp':", e.strerror)
            break
        for c in cases.split(b'\n'):
            if c.startswith(b'ASSERT'):
                all_cases.add((addr, c))

    print('*' * 40, 'ALL COND', '*' * 40)
    all_cases = list(all_cases)
    all_cases.sort(key=lambda x: (x[0], x[1]))
    for addr, val in all_cases:
        print('Address:', addr, 'is reachable using argc', val)