trailofbits/manticore

View on GitHub
manticore/native/plugins.py

Summary

Maintainability
A
0 mins
Test Coverage
import types

from ..core.plugin import Plugin
from .state_merging import merge_constraints, is_merge_possible, merge
import logging

logger = logging.getLogger(__name__)


class Merger(Plugin):
    def on_register(self):
        p = self.manticore._publish
        self.manticore._publish = lambda *args, **kwargs: None
        for state in self.manticore.ready_states:
            self.did_fork_state_callback(state, None, None, None, None)

        self.manticore._publish = p

    def load_state(self, state_id, delete=False):
        """
        Loads a state in Manticore with state-id = `state_id` by using the corresponding API in Executor
        :param state_id: state-id for state that is being loaded
        :param delete: If set to True, deletes the state with state-id = `state_id`
        :return: None
        """
        state = self.manticore._workspace.load_state(state_id, delete=delete)
        return state

    def delete_state(self, state_id):
        """
        Deletes a state in Manticore with state-id = `state_id` by using the corresponding API in Executor
        :param state_id: state-id for state that is being deleted
        :return: None
        """
        self.manticore.kill_state(state_id, delete=True)

    def replace_state(self, state_id, state):
        """
        Replaces a state in Manticore with state-id = `state_id` by using the corresponding API in Executor
        :param state_id: state-id for state that is being replaced
        :param state: State object that is replacing the existing state with `state_id`
        :return: None
        """
        return self.manticore._save(state, state_id=state_id)

    def did_fork_state_callback(self, state, expression, new_value, policy, children):
        state_id = state.id
        """That means no one else has to offer one Message Input

        Maintains a `cpu_stateid_dict` in context that keeps maps program counter values to a list of state-id values
        for states that will execute the instruction that that program counter as the next instruction
        :param state_id: id for state that was just enqueued
        :param state: State object that was just enqueued
        :return: None
        """
        # when a new state is addded to the list we save it so we do not have
        # to repload all states when try to merges last PC
        with self.locked_context("cpu_stateid_dict", dict) as cpu_stateid_dict:
            # as we may be running in a different process we need to access this
            # on a lock and over shared memory like this
            state_id_list = cpu_stateid_dict.get(state.cpu.PC, list())
            state_id_list.append(state_id)
            cpu_stateid_dict[state.cpu.PC] = state_id_list

    def will_load_state_callback(self, current_state_id):
        """
        Checks if the state to be loaded (referenced by `current_state_id` can be merged with another currently enqueued
        state and replaced by the merged state
        :param current_state_id: state about to be loaded
        :return: None
        """
        # When a state is loaded for exploration lets check if we can find it a
        # mate for merging
        with self.locked_context("cpu_stateid_dict", dict) as cpu_stateid_dict:
            # we get the lock and get a copy of the shared context
            merged_state = self.load_state(current_state_id)
            states_at_pc = cpu_stateid_dict.get(merged_state.cpu.PC, [])

            # lets remove ourself from the list of waiting states
            # assert current_state_id in states_at_pc #???
            if current_state_id in states_at_pc:
                states_at_pc.remove(current_state_id)

            # Iterate over all remaining states that are waiting for exploration
            # at the same PC
            merged_ids = []
            for new_state_id in states_at_pc:
                new_state = self.load_state(new_state_id)
                (exp_merged_state, exp_new_state, merged_constraint) = merge_constraints(
                    merged_state.constraints, new_state.constraints
                )
                is_mergeable, reason = is_merge_possible(merged_state, new_state, merged_constraint)

                if is_mergeable:
                    # Ok we'll merge it!
                    merged_state = merge(
                        merged_state, new_state, exp_merged_state, merged_constraint
                    )

                    # lets remove the vestigial links to the old state
                    self.delete_state(new_state_id)

                    merged_ids.append(new_state_id)
                    is_mergeable = "succeeded"
                else:
                    is_mergeable = "failed because of " + reason
                debug_string = (
                    "at PC = "
                    + hex(merged_state.cpu.PC)
                    + ", merge "
                    + is_mergeable
                    + " for state id = "
                    + str(current_state_id)
                    + " and "
                    + str(new_state_id)
                )
                logger.debug(debug_string)

            for i in merged_ids:
                states_at_pc.remove(i)

            cpu_stateid_dict[merged_state.cpu.PC] = states_at_pc

            # Ok so we have merged current_state_id with {merged_ids}
            # And removed all merged_ids from everywhere

            # UGLY we are replacing a state_id. This may be breaking caches in
            # the future
            self.replace_state(current_state_id, merged_state)


class SyscallCounter(Plugin):
    def will_execute_syscall_callback(self, state, model):
        name = model.__func__.__name__ if isinstance(model, types.MethodType) else model.__name__
        with self.locked_context("syscall_counts", dict) as counts:
            counts[name] = counts.get(name, 0) + 1

    def get_counts(self):
        with self.locked_context("syscall_counts", dict) as ctx:
            return ctx

    def did_run_callback(self):
        logger.info("Syscalls executed: %s", self.get_counts())