cea-sec/miasm

View on GitHub
example/disasm/full.py

Summary

Maintainability
C
7 hrs
Test Coverage
from __future__ import print_function
import logging
from argparse import ArgumentParser
from pdb import pm

from future.utils import viewitems, viewvalues

from miasm.analysis.binary import Container
from miasm.core.asmblock import log_asmblock, AsmCFG
from miasm.core.interval import interval
from miasm.analysis.machine import Machine
from miasm.analysis.data_flow import \
    DiGraphDefUse, ReachingDefinitions, load_from_int
from miasm.expression.simplifications import expr_simp
from miasm.analysis.ssa import SSADiGraph
from miasm.ir.ir import AssignBlock, IRBlock
from miasm.analysis.simplifier import IRCFGSimplifierCommon, IRCFGSimplifierSSA
from miasm.core.locationdb import LocationDB

log = logging.getLogger("dis")
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(levelname)-5s: %(message)s"))
log.addHandler(console_handler)
log.setLevel(logging.INFO)


parser = ArgumentParser("Disassemble a binary")
parser.add_argument('filename', help="File to disassemble")
parser.add_argument('address', help="Starting address for disassembly engine",
                    nargs="*")
parser.add_argument('-m', '--architecture', help="architecture: " + \
                        ",".join(Machine.available_machine()))
parser.add_argument('-f', "--followcall", action="store_true",
                    help="Follow call instructions")
parser.add_argument('-b', "--blockwatchdog", default=None, type=int,
                    help="Maximum number of basic block to disassemble")
parser.add_argument('-n', "--funcswatchdog", default=None, type=int,
                    help="Maximum number of function to disassemble")
parser.add_argument('-r', "--recurfunctions", action="store_true",
                    help="Disassemble founded functions")
parser.add_argument('-v', "--verbose", action="count", help="Verbose mode",
                    default=0)
parser.add_argument('-g', "--gen_ir", action="store_true",
                    help="Compute the intermediate representation")
parser.add_argument('-z', "--dis-nulstart-block", action="store_true",
                    help="Do not disassemble NULL starting block")
parser.add_argument('-l', "--dontdis-retcall", action="store_true",
                    help="If set, disassemble only call destinations")
parser.add_argument('-s', "--simplify", action="count",
                    help="Apply simplifications rules (liveness, graph simplification, ...)",
                    default=0)
parser.add_argument("--base-address", default=0,
                    type=lambda x: int(x, 0),
                    help="Base address of the input binary")
parser.add_argument('-a', "--try-disasm-all", action="store_true",
                    help="Try to disassemble the whole binary")
parser.add_argument('-i', "--image", action="store_true",
                    help="Display image representation of disasm")
parser.add_argument('-c', "--rawbinary", default=False, action="store_true",
                    help="Don't interpret input as ELF/PE/...")
parser.add_argument('-d', "--defuse", action="store_true",
                    help="Dump the def-use graph in file 'defuse.dot'."
                    "The defuse is dumped after simplifications if -s option is specified")
parser.add_argument('-p', "--ssa", action="store_true",
                    help="Generate the ssa form in  'ssa.dot'.")
parser.add_argument('-x', "--propagexpr", action="store_true",
                    help="Do Expression propagation.")
parser.add_argument('-e', "--loadint", action="store_true",
                    help="Load integers from binary in fixed memory lookup.")
parser.add_argument('-j', "--calldontmodstack", action="store_true",
                    help="Consider stack high is not modified in subcalls")


args = parser.parse_args()

if args.verbose:
    log_asmblock.setLevel(logging.DEBUG)

loc_db = LocationDB()
log.info('Load binary')
if args.rawbinary:
    cont = Container.fallback_container(
        open(args.filename, "rb").read(),
        vm=None, addr=args.base_address,
        loc_db=loc_db,
    )
else:
    with open(args.filename, "rb") as fdesc:
        cont = Container.from_stream(
            fdesc, addr=args.base_address,
            loc_db=loc_db,
        )

default_addr = cont.entry_point
bs = cont.bin_stream
e = cont.executable
log.info('ok')

log.info("import machine...")
# Use the guessed architecture or the specified one
arch = args.architecture if args.architecture else cont.arch
if not arch:
    print("Architecture recognition fail. Please specify it in arguments")
    exit(-1)

# Instance the arch-dependent machine
machine = Machine(arch)
mn, dis_engine = machine.mn, machine.dis_engine
log.info('ok')

mdis = dis_engine(bs, loc_db=cont.loc_db)
# configure disasm engine
mdis.dontdis_retcall = args.dontdis_retcall
mdis.blocs_wd = args.blockwatchdog
mdis.dont_dis_nulstart_bloc = not args.dis_nulstart_block
mdis.follow_call = args.followcall

todo = []
addrs = []
for addr in args.address:
    try:
        addrs.append(int(addr, 0))
    except ValueError:
        # Second chance, try with symbol
        loc_key = mdis.loc_db.get_name_location(addr)
        offset = mdis.loc_db.get_location_offset(loc_key)
        addrs.append(offset)

if len(addrs) == 0 and default_addr is not None:
    addrs.append(default_addr)
for ad in addrs:
    todo += [(mdis, None, ad)]

done = set()
all_funcs = set()
all_funcs_blocks = {}


done_interval = interval()
finish = False

entry_points = set()
# Main disasm loop
while not finish and todo:
    while not finish and todo:
        mdis, caller, ad = todo.pop(0)
        if ad in done:
            continue
        done.add(ad)
        asmcfg = mdis.dis_multiblock(ad)
        entry_points.add(mdis.loc_db.get_offset_location(ad))

        log.info('func ok %.16x (%d)' % (ad, len(all_funcs)))

        all_funcs.add(ad)
        all_funcs_blocks[ad] = asmcfg
        for block in asmcfg.blocks:
            for l in block.lines:
                done_interval += interval([(l.offset, l.offset + l.l)])

        if args.funcswatchdog is not None:
            args.funcswatchdog -= 1
        if args.recurfunctions:
            for block in asmcfg.blocks:
                instr = block.get_subcall_instr()
                if not instr:
                    continue
                for dest in instr.getdstflow(mdis.loc_db):
                    if not dest.is_loc():
                        continue
                    offset = mdis.loc_db.get_location_offset(dest.loc_key)
                    todo.append((mdis, instr, offset))

        if args.funcswatchdog is not None and args.funcswatchdog <= 0:
            finish = True

    if args.try_disasm_all:
        for a, b in done_interval.intervals:
            if b in done:
                continue
            log.debug('add func %s' % hex(b))
            todo.append((mdis, None, b))


# Generate dotty graph
all_asmcfg = AsmCFG(mdis.loc_db)
for blocks in viewvalues(all_funcs_blocks):
    all_asmcfg += blocks


log.info('generate graph file')
open('graph_execflow.dot', 'w').write(all_asmcfg.dot(offset=True))

log.info('generate intervals')

all_lines = []
total_l = 0

print(done_interval)
if args.image:
    log.info('build img')
    done_interval.show()

for i, j in done_interval.intervals:
    log.debug((hex(i), "->", hex(j)))


all_lines.sort(key=lambda x: x.offset)
open('lines.dot', 'w').write('\n'.join(str(l) for l in all_lines))
log.info('total lines %s' % total_l)


if args.propagexpr:
    args.gen_ir = True


class LifterDelModCallStack(machine.lifter_model_call):
        def call_effects(self, addr, instr):
            assignblks, extra = super(LifterDelModCallStack, self).call_effects(addr, instr)
            if not args.calldontmodstack:
                return assignblks, extra
            out = []
            for assignblk in assignblks:
                dct = dict(assignblk)
                dct = {
                    dst:src for (dst, src) in viewitems(dct) if dst != self.sp
                }
                out.append(AssignBlock(dct, assignblk.instr))
            return out, extra


# Bonus, generate IR graph
if args.gen_ir:
    log.info("Lift and Lift with modeled calls")

    lifter = machine.lifter(mdis.loc_db)
    lifter_model_call = LifterDelModCallStack(mdis.loc_db)

    ircfg = lifter.new_ircfg()
    ircfg_model_call = lifter.new_ircfg()

    head = list(entry_points)[0]

    for ad, asmcfg in viewitems(all_funcs_blocks):
        log.info("generating IR... %x" % ad)
        for block in asmcfg.blocks:
            lifter.add_asmblock_to_ircfg(block, ircfg)
            lifter_model_call.add_asmblock_to_ircfg(block, ircfg_model_call)

    log.info("Print blocks (without analyse)")
    for label, block in viewitems(ircfg.blocks):
        print(block)

    log.info("Gen Graph... %x" % ad)

    log.info("Print blocks (with analyse)")
    for label, block in viewitems(ircfg_model_call.blocks):
        print(block)

    if args.simplify > 0:
        log.info("Simplify...")
        ircfg_simplifier = IRCFGSimplifierCommon(lifter_model_call)
        ircfg_simplifier.simplify(ircfg_model_call, head)
        log.info("ok...")

    if args.defuse:
        reachings = ReachingDefinitions(ircfg_model_call)
        open('graph_defuse.dot', 'w').write(DiGraphDefUse(reachings).dot())

    out = ircfg.dot()
    open('graph_irflow_raw.dot', 'w').write(out)
    out = ircfg_model_call.dot()
    open('graph_irflow.dot', 'w').write(out)

    if args.ssa and not args.propagexpr:
        if len(entry_points) != 1:
            raise RuntimeError("Your graph should have only one head")
        ssa = SSADiGraph(ircfg_model_call)
        ssa.transform(head)
        open("ssa.dot", "w").write(ircfg_model_call.dot())


if args.propagexpr:

    def is_addr_ro_variable(bs, addr, size):
        """
        Return True if address at @addr is a read-only variable.
        WARNING: Quick & Dirty

        @addr: integer representing the address of the variable
        @size: size in bits

        """
        try:
            _ = bs.getbytes(addr, size // 8)
        except IOError:
            return False
        return True

    class CustomIRCFGSimplifierSSA(IRCFGSimplifierSSA):
        def do_simplify(self, ssa, head):
            modified = super(CustomIRCFGSimplifierSSA, self).do_simplify(ssa, head)
            if args.loadint:
                modified |= load_from_int(ssa.graph, bs, is_addr_ro_variable)

        def simplify(self, ircfg, head):
            ssa = self.ircfg_to_ssa(ircfg, head)
            ssa = self.do_simplify_loop(ssa, head)
            ircfg = self.ssa_to_unssa(ssa, head)

            ircfg_simplifier = IRCFGSimplifierCommon(self.lifter)
            ircfg_simplifier.deadremoval.add_expr_to_original_expr(ssa.ssa_variable_to_expr)
            ircfg_simplifier.simplify(ircfg, head)
            return ircfg

    head = list(entry_points)[0]
    simplifier = CustomIRCFGSimplifierSSA(lifter_model_call)
    ircfg = simplifier.simplify(ircfg_model_call, head)
    open('final.dot', 'w').write(ircfg.dot())