trailofbits/manticore

View on GitHub
manticore/ethereum/detectors.py

Summary

Maintainability
A
0 mins
Test Coverage
import hashlib
from typing import Optional
import logging
from contextlib import contextmanager

from ..core.smtlib import (
    Operators,
    Constant,
    simplify,
    istainted,
    issymbolic,
    get_taints,
    taint_with,
)
from ..core.plugin import Plugin
from ..utils.enums import DetectorClassification

logger = logging.getLogger(__name__)


class Detector(Plugin):
    # argument that needs to be passed to --detect to use given detector
    ARGUMENT: Optional[str] = None
    # help string
    HELP: Optional[str] = None
    # DetectorClassification value
    IMPACT: Optional[DetectorClassification] = None
    # DetectorClassification value
    CONFIDENCE: Optional[DetectorClassification] = None

    @property
    def name(self):
        return self.__class__.__name__.split(".")[-1]

    def get_findings(self, state):
        return state.context.setdefault("{:s}.findings".format(self.name), list())

    @contextmanager
    def locked_global_findings(self):
        with self.manticore.locked_context(
            "{:s}.global_findings".format(self.name), list
        ) as global_findings:
            yield global_findings

    @property
    def global_findings(self):
        with self.locked_global_findings() as global_findings:
            return global_findings

    def add_finding(self, state, address, pc, finding, at_init, constraint=True):
        """
        Logs a finding at specified contract and assembler line.
        :param state: current state
        :param address: contract address of the finding
        :param pc: program counter of the finding
        :param at_init: true if executing the constructor
        :param finding: textual description of the finding
        :param constraint: finding is considered reproducible only when constraint is True
        """

        if issymbolic(pc):
            pc = simplify(pc)
        if isinstance(pc, Constant):
            pc = pc.value
        if not isinstance(pc, int):
            raise ValueError("PC must be a number")
        self.get_findings(state).append((address, pc, finding, at_init, constraint))
        with self.locked_global_findings() as gf:
            gf.append((address, pc, finding, at_init))
        # Fixme for ever broken logger
        logger.warning(finding)

    def add_finding_here(self, state, finding, constraint=True):
        """
        Logs a finding in current contract and assembler line.
        :param state: current state
        :param finding: textual description of the finding
        :param constraint: finding is considered reproducible only when constraint is True
        """
        address = state.platform.current_vm.address
        pc = state.platform.current_vm.pc
        at_init = state.platform.current_transaction.sort == "CREATE"
        self.add_finding(state, address, pc, finding, at_init, constraint)

    def _save_current_location(self, state, finding, condition=True):
        """
        Save current location in the internal locations list and returns a textual id for it.
        This is used to save locations that could later be promoted to a finding if other conditions hold
        See _get_location()
        :param state: current state
        :param finding: textual description of the finding
        :param condition: general purpose constraint
        """
        address = state.platform.current_vm.address
        pc = state.platform.current_vm.pc
        at_init = state.platform.current_transaction.sort == "CREATE"
        location = (address, pc, finding, at_init, condition)
        hash_id = hashlib.sha1(str(location).encode()).hexdigest()
        state.context.setdefault("{:s}.locations".format(self.name), {})[hash_id] = location
        return hash_id

    def _get_location(self, state, hash_id):
        """
        Get previously saved location
        A location is composed of: address, pc, finding, at_init, condition
        """
        return state.context.setdefault("{:s}.locations".format(self.name), {})[hash_id]

    def _get_src(self, address, pc):
        return self.manticore.get_metadata(address).get_source_for(pc)


class DetectEnvInstruction(Detector):
    """
    Detect the usage of instructions that query environmental/block information:
    BLOCKHASH, COINBASE, TIMESTAMP, NUMBER, DIFFICULTY, GASLIMIT, ORIGIN, GASPRICE

    Sometimes environmental information can be manipulated. Contracts should avoid
    using it. Unless special situations. Notably to programatically detect human transactions
    `sender == origin`
    """

    ARGUMENT = "env-instr"
    HELP = "Use of potentially unsafe/manipulable instructions"
    IMPACT = DetectorClassification.MEDIUM
    CONFIDENCE = DetectorClassification.HIGH

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):
        if instruction.semantics in (
            "BLOCKHASH",
            "COINBASE",
            "TIMESTAMP",
            "NUMBER",
            "DIFFICULTY",
            "GASLIMIT",
            "ORIGIN",
            "GASPRICE",
        ):
            self.add_finding_here(state, f"Warning {instruction.semantics} instruction used")


class DetectSuicidal(Detector):
    ARGUMENT = "suicidal"
    HELP = "Reachable selfdestruct instructions"
    IMPACT = DetectorClassification.MEDIUM
    CONFIDENCE = DetectorClassification.HIGH

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):
        if instruction.semantics == "SELFDESTRUCT":
            self.add_finding_here(state, "Reachable SELFDESTRUCT")


class DetectExternalCallAndLeak(Detector):
    ARGUMENT = "ext-call-leak"
    HELP = "Reachable external call or ether leak to sender or arbitrary address"
    IMPACT = DetectorClassification.MEDIUM
    CONFIDENCE = DetectorClassification.HIGH

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):

        if instruction.semantics == "CALL":
            dest_address = arguments[1]
            sent_value = arguments[2]
            msg_sender = state.platform.current_vm.caller

            if issymbolic(dest_address):
                # We assume dest_address is symbolic because it came from symbolic tx data (user input argument)
                self.add_finding_here(
                    state,
                    f"Reachable ether leak to sender via argument",
                    constraint=AND(msg_sender == dest_address, sent_value != 0),
                )
                self.add_finding_here(
                    state,
                    f"Reachable external call to sender via argument",
                    constraint=AND(msg_sender == dest_address, sent_value == 0),
                )

                # ok it can't go to the sender, but can it go to arbitrary addresses? (> 1 other address?)
                # we report nothing if it can't go to > 1 other addresses since that means the code constrained
                # to a specific address at some point, and that was probably intentional. attacker has basically
                # no control.

                possible_destinations = state.solve_n(dest_address, 2)
                if len(possible_destinations) > 1:
                    # This might be a false positive if the dest_address can't actually be solved to anything
                    # useful/exploitable, even though it can be solved to more than 1 thing
                    self.add_finding_here(
                        state,
                        f"Reachable ether leak to user controlled address via argument",
                        constraint=AND(msg_sender != dest_address, sent_value != 0),
                    )
                    self.add_finding_here(
                        state,
                        f"Reachable external call to user controlled address via argument",
                        constraint=AND(msg_sender != dest_address, sent_value == 0),
                    )

            else:
                if msg_sender == dest_address:
                    self.add_finding_here(
                        state, f"Reachable ether leak to sender", constraint=sent_value != 0
                    )
                    self.add_finding_here(
                        state, f"Reachable external call to sender", constraint=sent_value == 0
                    )


class DetectInvalid(Detector):
    ARGUMENT = "invalid"
    HELP = "Enable INVALID instruction detection"
    IMPACT = DetectorClassification.LOW
    CONFIDENCE = DetectorClassification.HIGH

    def __init__(self, only_human=True, **kwargs):
        """
        Detects INVALID instructions.

        INVALID instructions are originally designated to signal exceptional code.
        As in practice the INVALID instruction is used in different ways this
        detector may Generate a great deal of false positives.

        :param only_human: if True report only INVALID at depth 0 transactions
        """
        super().__init__(**kwargs)
        self._only_human = only_human

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):
        mnemonic = instruction.semantics

        if mnemonic == "INVALID":
            if not self._only_human or state.platform.current_transaction.depth == 0:
                self.add_finding_here(state, "INVALID instruction")


class DetectReentrancySimple(Detector):
    """
    Simple detector for reentrancy bugs.
    Alert if contract changes the state of storage (does a write) after a call with >2300 gas to a user controlled/symbolic
    external address or the msg.sender address.
    """

    ARGUMENT = "reentrancy"
    HELP = "Reentrancy bug"
    IMPACT = DetectorClassification.HIGH
    CONFIDENCE = DetectorClassification.HIGH

    @property
    def _context_key(self):
        return f"{self.name}.call_locations"

    def will_open_transaction_callback(self, state, tx):
        if tx.is_human:
            state.context[self._context_key] = []

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):
        if instruction.semantics == "CALL":
            gas = arguments[0]
            dest_address = arguments[1]
            msg_sender = state.platform.current_vm.caller
            pc = state.platform.current_vm.pc

            is_enough_gas = Operators.UGT(gas, 2300)
            if not state.can_be_true(is_enough_gas):
                return

            # flag any external call that's going to a symbolic/user controlled address, or that's going
            # concretely to the sender's address
            if issymbolic(dest_address) or msg_sender == dest_address:
                state.context.get(self._context_key, []).append((pc, is_enough_gas))

    def did_evm_write_storage_callback(self, state, address, offset, value):
        locs = state.context.get(self._context_key, [])

        # if we're here and locs has stuff in it. by definition this state has
        # encountered a dangerous call and is now at a write.
        for callpc, gas_constraint in locs:
            addr = state.platform.current_vm.address
            at_init = state.platform.current_transaction.sort == "CREATE"
            self.add_finding(
                state,
                addr,
                callpc,
                "Potential reentrancy vulnerability",
                at_init,
                constraint=gas_constraint,
            )


class DetectReentrancyAdvanced(Detector):
    """
    Detector for reentrancy bugs.
    Given an optional concrete list of attacker addresses, warn on the following conditions.

    1) A _successful_ call to an attacker address (address in attacker list), or any human account address
    (if no list is given). With enough gas (>2300).

    2) A SSTORE after the execution of the CALL.

    3) The storage slot of the SSTORE must be used in some path to control flow
    """

    ARGUMENT = "reentrancy-adv"
    HELP = "Reentrancy bug (different method)"
    IMPACT = DetectorClassification.HIGH
    CONFIDENCE = DetectorClassification.HIGH

    def __init__(self, addresses=None, **kwargs):
        super().__init__(**kwargs)
        # TODO Check addresses are normal accounts. Heuristics implemented here
        # assume target addresses wont execute code. i.e. won't detect a Reentrancy
        # attack in progess but only a potential attack
        self._addresses = addresses

    @property
    def _read_storage_name(self):
        return "{:s}.read_storage".format(self.name)

    def will_open_transaction_callback(self, state, tx):
        # Reset reading log on new human transactions
        if tx.is_human:
            state.context[self._read_storage_name] = set()
            state.context["{:s}.locations".format(self.name)] = dict()

    def did_close_transaction_callback(self, state, tx):
        world = state.platform
        # Check if it was an internal tx
        if not tx.is_human:
            # Check is the tx was successful
            if tx.result:
                # Check if gas was enough for a reentrancy attack
                if state.can_be_true(Operators.UGE(tx.gas, 2300)):
                    # Check if target address is attaker controlled
                    if (
                        self._addresses is None
                        and not world.get_code(tx.address)
                        or self._addresses is not None
                        and tx.address in self._addresses
                    ):
                        # that's enough. Save current location and read list
                        self._save_location_and_reads(state)

    def _save_location_and_reads(self, state):
        name = "{:s}.locations".format(self.name)
        locations = state.context.get(name, dict)
        world = state.platform
        address = world.current_vm.address
        pc = world.current_vm.pc
        if isinstance(pc, Constant):
            pc = pc.value
        assert isinstance(pc, int)
        at_init = world.current_transaction.sort == "CREATE"
        location = (address, pc, "Reentrancy multi-million ether bug", at_init)
        locations[location] = set(state.context[self._read_storage_name])
        state.context[name] = locations

    def _get_location_and_reads(self, state):
        name = "{:s}.locations".format(self.name)
        locations = state.context.get(name, dict)
        return locations.items()

    def did_evm_read_storage_callback(self, state, address, offset, value):
        state.context[self._read_storage_name].add((address, offset))

    def did_evm_write_storage_callback(self, state, address, offset, value):
        # if in potential DAO check that write to storage values read before
        # the "send"
        for location, reads in self._get_location_and_reads(state):
            for address_i, offset_i in reads:
                if address_i == address:
                    if state.can_be_true(offset == offset_i):
                        self.add_finding(state, *location)


class DetectIntegerOverflow(Detector):
    """
    Detects potential overflow and underflow conditions on ADD and SUB instructions.
    """

    ARGUMENT = "overflow"
    HELP = "Integer overflows"
    IMPACT = DetectorClassification.HIGH
    CONFIDENCE = DetectorClassification.HIGH

    @staticmethod
    def _signed_sub_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:
        a  -  b   -80000000 -3fffffff -00000001 +00000000 +00000001 +3fffffff +7fffffff
        +80000000    False    False    False    False     True     True     True
        +c0000001    False    False    False    False    False    False     True
        +ffffffff    False    False    False    False    False    False    False
        +00000000     True    False    False    False    False    False    False
        +00000001     True    False    False    False    False    False    False
        +3fffffff     True    False    False    False    False    False    False
        +7fffffff     True     True     True    False    False    False    False
        """
        sub = Operators.SEXTEND(a, 256, 512) - Operators.SEXTEND(b, 256, 512)
        cond = Operators.OR(sub < -(1 << 255), sub >= (1 << 255))
        return cond

    @staticmethod
    def _signed_add_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:

        a  +  b   -80000000 -3fffffff -00000001 +00000000 +00000001 +3fffffff +7fffffff
        +80000000     True     True     True    False    False    False    False
        +c0000001     True    False    False    False    False    False    False
        +ffffffff     True    False    False    False    False    False    False
        +00000000    False    False    False    False    False    False    False
        +00000001    False    False    False    False    False    False     True
        +3fffffff    False    False    False    False    False    False     True
        +7fffffff    False    False    False    False     True     True     True
        """
        add = Operators.SEXTEND(a, 256, 512) + Operators.SEXTEND(b, 256, 512)
        cond = Operators.OR(add < -(1 << 255), add >= (1 << 255))
        return cond

    @staticmethod
    def _unsigned_sub_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:

        a  -  b   ffffffff bfffffff 80000001 00000000 00000001 3ffffffff 7fffffff
        ffffffff     True     True     True    False     True     True     True
        bfffffff     True     True     True    False    False     True     True
        80000001     True     True     True    False    False     True     True
        00000000    False    False    False    False    False     True    False
        00000001     True    False    False    False    False     True    False
        ffffffff     True     True     True     True     True     True     True
        7fffffff     True     True     True    False    False     True    False
        """
        cond = Operators.UGT(b, a)
        return cond

    @staticmethod
    def _unsigned_add_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:

        a  +  b   ffffffff bfffffff 80000001 00000000 00000001 3ffffffff 7fffffff
        ffffffff     True     True     True    False     True     True     True
        bfffffff     True     True     True    False    False     True     True
        80000001     True     True     True    False    False     True     True
        00000000    False    False    False    False    False     True    False
        00000001     True    False    False    False    False     True    False
        ffffffff     True     True     True     True     True     True     True
        7fffffff     True     True     True    False    False     True    False
        """
        add = Operators.ZEXTEND(a, 512) + Operators.ZEXTEND(b, 512)
        cond = Operators.UGE(add, 1 << 256)
        return cond

    @staticmethod
    def _signed_mul_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:

        a  *  b           +00000000000000000 +00000000000000001 +0000000003fffffff +0000000007fffffff +00000000080000001 +000000000bfffffff +000000000ffffffff
        +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000
        +0000000000000001  +0000000000000000  +0000000000000001  +000000003fffffff  +000000007fffffff  +0000000080000001  +00000000bfffffff  +00000000ffffffff
        +000000003fffffff  +0000000000000000  +000000003fffffff *+0fffffff80000001 *+1fffffff40000001 *+1fffffffbfffffff *+2fffffff00000001 *+3ffffffec0000001
        +000000007fffffff  +0000000000000000  +000000007fffffff *+1fffffff40000001 *+3fffffff00000001 *+3fffffffffffffff *+5ffffffec0000001 *+7ffffffe80000001
        +0000000080000001  +0000000000000000  +0000000080000001 *+1fffffffbfffffff *+3fffffffffffffff *+4000000100000001 *+600000003fffffff *+800000007fffffff
        +00000000bfffffff  +0000000000000000  +00000000bfffffff *+2fffffff00000001 *+5ffffffec0000001 *+600000003fffffff *+8ffffffe80000001 *+bffffffe40000001
        +00000000ffffffff  +0000000000000000  +00000000ffffffff *+3ffffffec0000001 *+7ffffffe80000001 *+800000007fffffff *+bffffffe40000001 *+fffffffe00000001

        """
        mul = Operators.SEXTEND(a, 256, 512) * Operators.SEXTEND(b, 256, 512)
        cond = Operators.OR(mul < -(1 << 255), mul >= (1 << 255))
        return cond

    @staticmethod
    def _unsigned_mul_overflow(state, a, b):
        """
        Sign extend the value to 512 bits and check the result can be represented
         in 256. Following there is a 32 bit excerpt of this condition:

        a  *  b           +00000000000000000 +00000000000000001 +0000000003fffffff +0000000007fffffff +00000000080000001 +000000000bfffffff +000000000ffffffff
        +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000  +0000000000000000
        +0000000000000001  +0000000000000000  +0000000000000001  +000000003fffffff  +000000007fffffff  +0000000080000001  +00000000bfffffff  +00000000ffffffff
        +000000003fffffff  +0000000000000000  +000000003fffffff *+0fffffff80000001 *+1fffffff40000001 *+1fffffffbfffffff *+2fffffff00000001 *+3ffffffec0000001
        +000000007fffffff  +0000000000000000  +000000007fffffff *+1fffffff40000001 *+3fffffff00000001 *+3fffffffffffffff *+5ffffffec0000001 *+7ffffffe80000001
        +0000000080000001  +0000000000000000  +0000000080000001 *+1fffffffbfffffff *+3fffffffffffffff *+4000000100000001 *+600000003fffffff *+800000007fffffff
        +00000000bfffffff  +0000000000000000  +00000000bfffffff *+2fffffff00000001 *+5ffffffec0000001 *+600000003fffffff *+8ffffffe80000001 *+bffffffe40000001
        +00000000ffffffff  +0000000000000000  +00000000ffffffff *+3ffffffec0000001 *+7ffffffe80000001 *+800000007fffffff *+bffffffe40000001 *+fffffffe00000001

        """
        mul = Operators.SEXTEND(a, 256, 512) * Operators.SEXTEND(b, 256, 512)
        cond = Operators.UGE(mul, 1 << 256)
        return cond

    def _check_finding(self, state, what):
        if istainted(what, "SIGNED"):
            for taint in get_taints(what, "IOS_.*"):
                address, pc, finding, at_init, condition = self._get_location(state, taint[4:])
                if state.can_be_true(condition):
                    self.add_finding(state, address, pc, finding, at_init, condition)
        else:
            for taint in get_taints(what, "IOU_.*"):
                address, pc, finding, at_init, condition = self._get_location(state, taint[4:])
                if state.can_be_true(condition):
                    self.add_finding(state, address, pc, finding, at_init, condition)

    def did_evm_execute_instruction_callback(self, state, instruction, arguments, result):
        vm = state.platform.current_vm
        mnemonic = instruction.semantics
        ios = False
        iou = False

        if mnemonic == "ADD":
            ios = self._signed_add_overflow(state, *arguments)
            iou = self._unsigned_add_overflow(state, *arguments)
        elif mnemonic == "MUL":
            ios = self._signed_mul_overflow(state, *arguments)
            iou = self._unsigned_mul_overflow(state, *arguments)
        elif mnemonic == "SUB":
            ios = self._signed_sub_overflow(state, *arguments)
            iou = self._unsigned_sub_overflow(state, *arguments)
        elif mnemonic == "SSTORE":
            # If an overflowded value is stored in the storage then it is a finding
            # Todo: save this in a stack and only do the check if this does not
            #  revert/rollback
            where, what = arguments
            self._check_finding(state, what)
        elif mnemonic == "RETURN":
            world = state.platform
            if world.current_transaction.is_human:
                # If an overflowded value is returned to a human
                offset, size = arguments
                data = world.current_vm.read_buffer(offset, size)
                self._check_finding(state, data)

        if mnemonic in ("SLT", "SGT", "SDIV", "SMOD"):
            result = taint_with(result, "SIGNED")
        if mnemonic in ("ADD", "SUB", "MUL"):
            id_val = self._save_current_location(
                state, "Signed integer overflow at %s instruction" % mnemonic, ios
            )
            result = taint_with(result, "IOS_{:s}".format(id_val))

            id_val = self._save_current_location(
                state, "Unsigned integer overflow at %s instruction" % mnemonic, iou
            )
            result = taint_with(result, "IOU_{:s}".format(id_val))

        if mnemonic in ("SLT", "SGT", "SDIV", "SMOD", "ADD", "SUB", "MUL"):
            vm.change_last_result(result)


class DetectUnusedRetVal(Detector):
    """Detects unused return value from internal transactions"""

    ARGUMENT = "unused-return"
    HELP = "Unused internal transaction return values"
    IMPACT = DetectorClassification.LOW
    CONFIDENCE = DetectorClassification.HIGH

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._stack_name = "{:s}.stack".format(self.name)

    def _add_retval_taint(self, state, taint):
        taints = state.context[self._stack_name][-1]
        taints.add(taint)
        state.context[self._stack_name][-1] = taints

    def _remove_retval_taint(self, state, taint):
        taints = state.context[self._stack_name][-1]
        if taint in taints:
            taints.remove(taint)
            state.context[self._stack_name][-1] = taints

    def _get_retval_taints(self, state):
        return state.context[self._stack_name][-1]

    def will_open_transaction_callback(self, state, tx):
        # Reset reading log on new human transactions
        if tx.is_human:
            state.context[self._stack_name] = []
        state.context[self._stack_name].append(set())

    def did_close_transaction_callback(self, state, tx):
        world = state.platform
        # Check that all retvals were used in control flow
        for taint in self._get_retval_taints(state):
            id_val = taint[7:]
            address, pc, finding, at_init, condition = self._get_location(state, id_val)
            if state.can_be_true(condition):
                self.add_finding(state, address, pc, finding, at_init)

        state.context[self._stack_name].pop()

    def did_evm_execute_instruction_callback(self, state, instruction, arguments, result):
        world = state.platform
        mnemonic = instruction.semantics
        current_vm = world.current_vm
        if instruction.is_starttx:
            # A transactional instruction just returned so we add a taint to result
            # and add that taint to the set
            id_val = self._save_current_location(
                state, "Returned value at {:s} instruction is not used".format(mnemonic)
            )
            taint = "RETVAL_{:s}".format(id_val)
            current_vm.change_last_result(taint_with(result, taint))
            self._add_retval_taint(state, taint)
        elif mnemonic == "JUMPI":
            dest, cond = arguments
            for used_taint in get_taints(cond, "RETVAL_.*"):
                self._remove_retval_taint(state, used_taint)


class DetectDelegatecall(Detector):
    """
    Detects DELEGATECALLs to controlled addresses and or with controlled function id.
    This detector finds and reports on any delegatecall instruction any the following propositions are hold:
        * the destination address can be controlled by the caller
        * the first 4 bytes of the calldata are controlled by the caller
    """

    ARGUMENT = "delegatecall"
    HELP = "Problematic uses of DELEGATECALL instruction"
    IMPACT = DetectorClassification.HIGH
    CONFIDENCE = DetectorClassification.HIGH

    def _to_constant(self, expression):
        if isinstance(expression, Constant):
            return expression.value
        return expression

    def will_evm_execute_instruction_callback(self, state, instruction, arguments):
        world = state.platform
        mnemonic = instruction.semantics

        # If it executed a DELEGATECALL
        # TODO: Check the transaction was success
        # if blockchain.last_transaction.return_value:
        # TODO: check if any of the potential target addresses has code
        # if not any( world.get_code, possible_addresses):
        if mnemonic == "DELEGATECALL":
            gas, address, in_offset, in_size, out_offset, out_size = arguments
            if issymbolic(address):
                possible_addresses = state.solve_n(address, 2)
                if len(possible_addresses) > 1:
                    self.add_finding_here(state, "Delegatecall to user controlled address")

            in_offset = self._to_constant(in_offset)
            in_size = self._to_constant(in_size)
            calldata = world.current_vm.read_buffer(in_offset, in_size)
            func_id = calldata[:4]
            if issymbolic(func_id):
                possible_func_ids = state.solve_n(func_id, 2)
                if len(possible_func_ids) > 1:
                    self.add_finding_here(state, "Delegatecall to user controlled function")


class DetectUninitializedMemory(Detector):
    """
    Detects uses of uninitialized memory
    """

    ARGUMENT = "uninitialized-memory"
    HELP = "Uninitialized memory usage"
    IMPACT = DetectorClassification.MEDIUM
    CONFIDENCE = DetectorClassification.HIGH

    def did_evm_read_memory_callback(self, state, offset, value, size):
        initialized_memory = state.context.get("{:s}.initialized_memory".format(self.name), set())
        cbu = True  # Can be unknown
        current_contract = state.platform.current_vm.address
        for known_contract, known_offset in initialized_memory:
            if current_contract == known_contract:
                for offset_i in range(size):
                    cbu = Operators.AND(cbu, (offset + offset_i) != known_offset)
        if state.can_be_true(cbu):
            self.add_finding_here(
                state,
                "Potentially reading uninitialized memory at instruction (address: %r, offset %r)"
                % (current_contract, offset),
            )

    def did_evm_write_memory_callback(self, state, offset, value, size):
        current_contract = state.platform.current_vm.address

        # concrete or symbolic write
        for offset_i in range(size):
            state.context.setdefault("{:s}.initialized_memory".format(self.name), set()).add(
                (current_contract, offset + offset_i)
            )


class DetectUninitializedStorage(Detector):
    """
    Detects uses of uninitialized storage
    """

    ARGUMENT = "uninitialized-storage"
    HELP = "Uninitialized storage usage"
    IMPACT = DetectorClassification.MEDIUM
    CONFIDENCE = DetectorClassification.HIGH

    def did_evm_read_storage_callback(self, state, address, offset, value):
        if not state.can_be_true(value != 0):
            # Not initialized memory should be zero
            return
        # check if offset is known
        cbu = True  # Can be unknown
        context_name = "{:s}.initialized_storage".format(self.name)
        for known_address, known_offset in state.context.get(context_name, ()):
            cbu = Operators.AND(cbu, Operators.OR(address != known_address, offset != known_offset))

        if state.can_be_true(cbu):
            self.add_finding_here(state, "Potentially reading uninitialized storage", cbu)

    def did_evm_write_storage_callback(self, state, address, offset, value):
        # concrete or symbolic write
        state.context.setdefault("{:s}.initialized_storage".format(self.name), set()).add(
            (address, offset)
        )


class DetectRaceCondition(Detector):
    """
    Detects possible transaction race conditions (transaction order dependencies)

    The RaceCondition detector might not work properly for contracts that have only a fallback function.
    See the detector's implementation and it's `_in_user_func` method for more information.
    """

    ARGUMENT = "race-condition"
    HELP = "Possible transaction race conditions"
    IMPACT = DetectorClassification.LOW
    CONFIDENCE = DetectorClassification.LOW

    TAINT = "written_storage_slots."

    def __init__(self, *a, **kw):
        # Normally `add_finding_here` makes it unique reporting but
        # we might try to report the same thing multiple times e.g. in consecutive instructions
        # so we need to make our own 'unique findings' set too.
        self.__findings = set()
        super().__init__(*a, **kw)

    @staticmethod
    def _in_user_func(state):
        """
        :param state: current state
        :return: whether the current execution is in a user-defined function or not.

        NOTE / TODO / FIXME: As this may produce false postives, this is not in the base `Detector` class.
        It should be fixed at some point and moved there. See below.

        The first 4 bytes of tx data is keccak256 hash of the function signature that is called by given tx.

        All transactions start within Solidity dispatcher function: it takes passed hash and dispatches
        the execution to given function based on it.

        So: if we are in the dispatcher, *and contract have some functions* one of the first four tx data bytes
        will effectively have more than one solutions.

        BUT if contract have only a fallback function, the equation below may return more solutions when we are
        in a dispatcher function.  <--- because of that, we warn that the detector is not that stable
        for contracts with only a fallback function.
        """

        # If we are already in user function (we cached it) let's just return True
        in_function = state.context.get("in_function", False)
        prev_tx_count = state.context.get("prev_tx_count", 0)
        curr_tx_count = len(state.platform.transactions)

        new_human_tx = prev_tx_count != curr_tx_count

        if in_function and not new_human_tx:
            return True

        # This is expensive call, so we cache it
        in_function = len(state.solve_n(state.platform.current_transaction.data[:4], 2)) == 1

        state.context["in_function"] = in_function
        state.context["prev_tx_count"] = curr_tx_count

        return in_function

    def did_evm_write_storage_callback(self, state, storage_address, offset, value):
        world = state.platform
        curr_tx = world.current_transaction

        if curr_tx.sort == "CREATE" or not self._in_user_func(state):
            return

        key = self.TAINT + str(offset)  # offset is storage index/slot

        # Taint stored value so we will know if it is used later on
        result = taint_with(value, key)
        world.set_storage_data(storage_address, offset, result)

        metadata = self.manticore.metadata[curr_tx.address]

        func_sig = metadata.get_func_signature(state.solve_one(curr_tx.data[:4]))

        # Save signature of function that tainted the value at given storage index
        state.context.setdefault(key, set()).add(func_sig)

    def did_evm_execute_instruction_callback(self, state, instruction, arguments, result_ref):
        if not self._in_user_func(state):
            return

        # We won't be able to add a finding if pc is not a constant value
        if not isinstance(state.platform.current_vm.pc, (int, Constant)):
            return

        world = state.platform
        curr_tx = world.current_transaction

        if curr_tx.sort != "CREATE" and curr_tx.address in self.manticore.metadata:
            metadata = self.manticore.metadata[curr_tx.address]
            curr_func = metadata.get_func_signature(state.solve_one(curr_tx.data[:4]))

            for arg in arguments:
                if istainted(arg):
                    for taint in get_taints(arg, self.TAINT + "*"):
                        tainted_val = taint[taint.rindex(".") + 1 :]

                        try:
                            storage_index = int(tainted_val)
                            storage_index_key = storage_index
                        except ValueError:
                            storage_index = "which is symbolic"
                            storage_index_key = hash(tainted_val)

                        prev_funcs = state.context[taint]

                        for prev_func in prev_funcs:
                            # if prev_func is None, it didn't have a signature (so it was a dispatcher function)
                            if prev_func is None:
                                continue

                            msg = "Potential race condition (transaction order dependency):\n"
                            msg += (
                                f"Value has been stored in storage slot/index {storage_index} in transaction that "
                                f"called {prev_func} and is now used in transaction that calls {curr_func}.\n"
                                f"An attacker seeing a transaction to {curr_func} could create a transaction "
                                f"to {prev_func} with high gas and win a race."
                            )

                            unique_key = (storage_index_key, prev_func, curr_func)
                            if unique_key in self.__findings:
                                continue

                            self.__findings.add(unique_key)
                            self.add_finding_here(state, msg)


class DetectManipulableBalance(Detector):
    """
    Detects the use of manipulable balance in strict compare.
    """

    ARGUMENT = "lockdrop"
    HELP = "Use balance in EQ"
    IMPACT = DetectorClassification.HIGH
    CONFIDENCE = DetectorClassification.HIGH

    def did_evm_execute_instruction_callback(self, state, instruction, arguments, result):
        vm = state.platform.current_vm
        mnemonic = instruction.semantics

        if mnemonic == "BALANCE":
            # replace result with tainted value
            result = taint_with(result, "BALANCE")
            vm.change_last_result(result)
        elif mnemonic == "EQ":
            # check if balance tainted
            for op in arguments:
                if istainted(op, "BALANCE"):
                    self.add_finding_here(state, "Manipulable balance used in a strict comparison")