cea-sec/miasm

View on GitHub
miasm/jitter/jitcore_python.py

Summary

Maintainability
D
1 day
Test Coverage
from __future__ import print_function
from builtins import zip
import miasm.jitter.jitcore as jitcore
from miasm.expression.expression import ExprInt, ExprLoc
import miasm.jitter.csts as csts
from miasm.expression.simplifications import expr_simp_explicit
from miasm.jitter.emulatedsymbexec import EmulatedSymbExec

################################################################################
#                              Python jitter Core                              #
################################################################################


class JitCore_Python(jitcore.JitCore):
    "JiT management, using Miasm2 Symbol Execution engine as backend"

    SymbExecClass = EmulatedSymbExec

    def __init__(self, lifter, bin_stream):
        super(JitCore_Python, self).__init__(lifter, bin_stream)
        self.lifter = lifter
        self.ircfg = self.lifter.new_ircfg()

        # CPU & VM (None for now) will be set later

        self.symbexec = self.SymbExecClass(
            None, None,
            self.lifter, {},
            sb_expr_simp=expr_simp_explicit
        )
        self.symbexec.enable_emulated_simplifications()

    def set_cpu_vm(self, cpu, vm):
        self.symbexec.cpu = cpu
        self.symbexec.vm = vm

    def load(self):
        "Preload symbols according to current architecture"
        self.symbexec.reset_regs()

    def arch_specific(self):
        """Return arch specific information for the current architecture"""
        arch = self.lifter.arch
        has_delayslot = False
        if arch.name == "mips32":
            from miasm.arch.mips32.jit import mipsCGen
            cgen_class = mipsCGen
            has_delayslot = True
        elif arch.name == "arm":
            from miasm.arch.arm.jit import arm_CGen
            cgen_class = arm_CGen
        else:
            from miasm.jitter.codegen import CGen
            cgen_class = CGen
        return cgen_class(self.lifter), has_delayslot

    def add_block(self, asmblock):
        """Create a python function corresponding to an AsmBlock
        @asmblock: AsmBlock
        """

        # TODO: merge duplicate code with CGen, llvmconvert
        codegen, has_delayslot = self.arch_specific()
        irblocks_list = codegen.block2assignblks(asmblock)
        instr_offsets = [line.offset for line in asmblock.lines]

        loc_db = self.lifter.loc_db
        local_loc_keys = []
        for irblocks in irblocks_list:
            for irblock in irblocks:
                local_loc_keys.append(irblock.loc_key)

        def myfunc(cpu):
            """Execute the function according to cpu and vmmngr states
            @cpu: JitCpu instance
            """
            # Get virtual memory handler
            vmmngr = cpu.vmmngr

            # Get execution engine (EmulatedSymbExec instance)
            exec_engine = self.symbexec

            # Refresh CPU values according to @cpu instance
            exec_engine.update_engine_from_cpu()

            # Get initial loc_key
            cur_loc_key = asmblock.loc_key

            # Update PC helper
            update_pc = lambda value: setattr(cpu, self.lifter.pc.name, value)

            while True:
                # Retrieve the expected irblock
                for instr, irblocks in zip(asmblock.lines, irblocks_list):
                    for index, irblock in enumerate(irblocks):
                        if irblock.loc_key == cur_loc_key:
                            break
                    else:
                        continue
                    break
                else:
                    raise RuntimeError("Unable to find the block for %r" % cur_loc_key)

                instr_attrib, irblocks_attributes = codegen.get_attributes(
                    instr, irblocks, self.log_mn, self.log_regs
                )
                irblock_attributes = irblocks_attributes[index]

                # Do IRBlock
                new_irblock = self.lifter.irbloc_fix_regs_for_mode(
                    irblock, self.lifter.attrib
                )
                if index == 0:
                    # Pre code
                    if instr_attrib.log_mn:
                        print("%.8X %s" % (
                            instr_attrib.instr.offset,
                            instr_attrib.instr.to_string(loc_db)
                        ))

                # Exec IRBlock
                instr = instr_attrib.instr

                for index, assignblk in enumerate(irblock):
                    attributes = irblock_attributes[index]

                    # Eval current instruction (in IR)
                    exec_engine.eval_updt_assignblk(assignblk)

                    # Check memory access / write exception
                    # TODO: insert a check between memory reads and writes
                    if attributes.mem_read or attributes.mem_write:
                        # Restricted exception
                        flag = ~csts.EXCEPT_CODE_AUTOMOD & csts.EXCEPT_DO_NOT_UPDATE_PC
                        if (vmmngr.get_exception() & flag != 0):
                            # Do not update registers
                            update_pc(instr.offset)
                            return instr.offset

                    # Update registers values
                    exec_engine.update_cpu_from_engine()

                    # Check post assignblk exception flags
                    if attributes.set_exception:
                        # Restricted exception
                        if cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP:
                            # Update PC
                            update_pc(instr.offset)
                            return instr.offset

                dst = exec_engine.eval_expr(self.lifter.IRDst)
                if dst.is_int():
                    loc_key = loc_db.get_or_create_offset_location(int(dst))
                    dst = ExprLoc(loc_key, dst.size)

                assert dst.is_loc()
                loc_key = dst.loc_key
                offset = loc_db.get_location_offset(loc_key)
                if offset is None:
                    # Avoid checks on generated label
                    cur_loc_key = loc_key
                    continue

                if instr_attrib.log_regs:
                    update_pc(offset)
                    cpu.dump_gpregs_with_attrib(self.lifter.attrib)

                # Post-instr checks
                if instr_attrib.mem_read | instr_attrib.mem_write:
                    vmmngr.check_memory_breakpoint()
                    vmmngr.check_invalid_code_blocs()
                    if vmmngr.get_exception():
                        update_pc(offset)
                        return offset

                if instr_attrib.set_exception:
                    if cpu.get_exception():
                        update_pc(offset)
                        return offset

                if instr_attrib.mem_read | instr_attrib.mem_write:
                    vmmngr.reset_memory_access()

                # Manage resulting address
                if (loc_key in local_loc_keys and
                    offset > instr.offset):
                    # Forward local jump
                    # Note: a backward local jump has to be promoted to extern,
                    # for max_exec_per_call support
                    cur_loc_key = loc_key
                    continue

                # Delay slot
                if has_delayslot:
                    delay_slot_set = exec_engine.eval_expr(codegen.delay_slot_set)
                    if delay_slot_set.is_int() and int(delay_slot_set) != 0:
                        return int(exec_engine.eval_expr(codegen.delay_slot_dst))

                # Extern of asmblock, must have an offset
                assert offset is not None
                return offset

        # Associate myfunc with current loc_key
        offset = loc_db.get_location_offset(asmblock.loc_key)
        assert offset is not None
        self.offset_to_jitted_func[offset] = myfunc

    def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets,
                     _max_exec_per_call):
        """Call the function @loc_key with @cpu
        @loc_key: function's loc_key
        @cpu: JitCpu instance
        """

        # Get Python function corresponding to @loc_key
        fc_ptr = self.offset_to_jitted_func[loc_key]

        # Execute the function
        return fc_ptr(cpu)