miasm/jitter/jitcore_python.py
from __future__ import print_function
from builtins import zip
import miasm.jitter.jitcore as jitcore
from miasm.expression.expression import ExprInt, ExprLoc
import miasm.jitter.csts as csts
from miasm.expression.simplifications import expr_simp_explicit
from miasm.jitter.emulatedsymbexec import EmulatedSymbExec
################################################################################
# Python jitter Core #
################################################################################
class JitCore_Python(jitcore.JitCore):
"JiT management, using Miasm2 Symbol Execution engine as backend"
SymbExecClass = EmulatedSymbExec
def __init__(self, lifter, bin_stream):
super(JitCore_Python, self).__init__(lifter, bin_stream)
self.lifter = lifter
self.ircfg = self.lifter.new_ircfg()
# CPU & VM (None for now) will be set later
self.symbexec = self.SymbExecClass(
None, None,
self.lifter, {},
sb_expr_simp=expr_simp_explicit
)
self.symbexec.enable_emulated_simplifications()
def set_cpu_vm(self, cpu, vm):
self.symbexec.cpu = cpu
self.symbexec.vm = vm
def load(self):
"Preload symbols according to current architecture"
self.symbexec.reset_regs()
def arch_specific(self):
"""Return arch specific information for the current architecture"""
arch = self.lifter.arch
has_delayslot = False
if arch.name == "mips32":
from miasm.arch.mips32.jit import mipsCGen
cgen_class = mipsCGen
has_delayslot = True
elif arch.name == "arm":
from miasm.arch.arm.jit import arm_CGen
cgen_class = arm_CGen
else:
from miasm.jitter.codegen import CGen
cgen_class = CGen
return cgen_class(self.lifter), has_delayslot
def add_block(self, asmblock):
"""Create a python function corresponding to an AsmBlock
@asmblock: AsmBlock
"""
# TODO: merge duplicate code with CGen, llvmconvert
codegen, has_delayslot = self.arch_specific()
irblocks_list = codegen.block2assignblks(asmblock)
instr_offsets = [line.offset for line in asmblock.lines]
loc_db = self.lifter.loc_db
local_loc_keys = []
for irblocks in irblocks_list:
for irblock in irblocks:
local_loc_keys.append(irblock.loc_key)
def myfunc(cpu):
"""Execute the function according to cpu and vmmngr states
@cpu: JitCpu instance
"""
# Get virtual memory handler
vmmngr = cpu.vmmngr
# Get execution engine (EmulatedSymbExec instance)
exec_engine = self.symbexec
# Refresh CPU values according to @cpu instance
exec_engine.update_engine_from_cpu()
# Get initial loc_key
cur_loc_key = asmblock.loc_key
# Update PC helper
update_pc = lambda value: setattr(cpu, self.lifter.pc.name, value)
while True:
# Retrieve the expected irblock
for instr, irblocks in zip(asmblock.lines, irblocks_list):
for index, irblock in enumerate(irblocks):
if irblock.loc_key == cur_loc_key:
break
else:
continue
break
else:
raise RuntimeError("Unable to find the block for %r" % cur_loc_key)
instr_attrib, irblocks_attributes = codegen.get_attributes(
instr, irblocks, self.log_mn, self.log_regs
)
irblock_attributes = irblocks_attributes[index]
# Do IRBlock
new_irblock = self.lifter.irbloc_fix_regs_for_mode(
irblock, self.lifter.attrib
)
if index == 0:
# Pre code
if instr_attrib.log_mn:
print("%.8X %s" % (
instr_attrib.instr.offset,
instr_attrib.instr.to_string(loc_db)
))
# Exec IRBlock
instr = instr_attrib.instr
for index, assignblk in enumerate(irblock):
attributes = irblock_attributes[index]
# Eval current instruction (in IR)
exec_engine.eval_updt_assignblk(assignblk)
# Check memory access / write exception
# TODO: insert a check between memory reads and writes
if attributes.mem_read or attributes.mem_write:
# Restricted exception
flag = ~csts.EXCEPT_CODE_AUTOMOD & csts.EXCEPT_DO_NOT_UPDATE_PC
if (vmmngr.get_exception() & flag != 0):
# Do not update registers
update_pc(instr.offset)
return instr.offset
# Update registers values
exec_engine.update_cpu_from_engine()
# Check post assignblk exception flags
if attributes.set_exception:
# Restricted exception
if cpu.get_exception() > csts.EXCEPT_NUM_UPDT_EIP:
# Update PC
update_pc(instr.offset)
return instr.offset
dst = exec_engine.eval_expr(self.lifter.IRDst)
if dst.is_int():
loc_key = loc_db.get_or_create_offset_location(int(dst))
dst = ExprLoc(loc_key, dst.size)
assert dst.is_loc()
loc_key = dst.loc_key
offset = loc_db.get_location_offset(loc_key)
if offset is None:
# Avoid checks on generated label
cur_loc_key = loc_key
continue
if instr_attrib.log_regs:
update_pc(offset)
cpu.dump_gpregs_with_attrib(self.lifter.attrib)
# Post-instr checks
if instr_attrib.mem_read | instr_attrib.mem_write:
vmmngr.check_memory_breakpoint()
vmmngr.check_invalid_code_blocs()
if vmmngr.get_exception():
update_pc(offset)
return offset
if instr_attrib.set_exception:
if cpu.get_exception():
update_pc(offset)
return offset
if instr_attrib.mem_read | instr_attrib.mem_write:
vmmngr.reset_memory_access()
# Manage resulting address
if (loc_key in local_loc_keys and
offset > instr.offset):
# Forward local jump
# Note: a backward local jump has to be promoted to extern,
# for max_exec_per_call support
cur_loc_key = loc_key
continue
# Delay slot
if has_delayslot:
delay_slot_set = exec_engine.eval_expr(codegen.delay_slot_set)
if delay_slot_set.is_int() and int(delay_slot_set) != 0:
return int(exec_engine.eval_expr(codegen.delay_slot_dst))
# Extern of asmblock, must have an offset
assert offset is not None
return offset
# Associate myfunc with current loc_key
offset = loc_db.get_location_offset(asmblock.loc_key)
assert offset is not None
self.offset_to_jitted_func[offset] = myfunc
def exec_wrapper(self, loc_key, cpu, _offset_to_jitted_func, _stop_offsets,
_max_exec_per_call):
"""Call the function @loc_key with @cpu
@loc_key: function's loc_key
@cpu: JitCpu instance
"""
# Get Python function corresponding to @loc_key
fc_ptr = self.offset_to_jitted_func[loc_key]
# Execute the function
return fc_ptr(cpu)