manticore/ethereum/manticore.py
import binascii
import json
import logging
from multiprocessing import Queue, Process
from queue import Empty as EmptyQueue
from typing import Dict, Optional, Union
import io
import pyevmasm as EVMAsm
import random
import tempfile
import time
from crytic_compile import CryticCompile, InvalidCompilation, is_supported
from ..core.manticore import ManticoreBase, ManticoreError
from ..core.smtlib import (
ConstraintSet,
Array,
ArrayProxy,
BitVec,
Operators,
BoolConstant,
Expression,
issymbolic,
SelectedSolver,
)
from ..core.state import AbandonState
from .account import EVMContract, EVMAccount, ABI
from .detectors import Detector
from .solidity import SolidityMetadata
from .state import State
from ..exceptions import EthereumError, DependencyError, NoAliveStates
from ..platforms import evm
from ..utils import config
from ..utils.deprecated import deprecated
from ..utils.enums import Sha3Type
from ..utils.helpers import PickleSerializer, printable_bytes
logger = logging.getLogger(__name__)
logging.getLogger("CryticCompile").setLevel(logging.ERROR)
consts = config.get_group("evm")
consts.add("defaultgas", 3000000, "Default gas value for ethereum transactions.")
consts.add(
"sha3",
default=Sha3Type.symbolicate,
description="concretize: sound simple concretization\nsymbolicate(*): unsound symbolication with an out of cycle false positive killing\nfake: let the solver choose a function to emulate some of the hash characteristics (This potentially produces wrong testcases) ",
)
consts.add(
"sha3timeout",
60 * 60,
"Default timeout for matching sha3 for unsound states (see unsound symbolication).",
)
consts.add(
"events",
False,
"Show EVM events in the testcases.",
)
def flagged(flag):
"""
Return special character denoting concretization happened.
"""
return "(*)" if flag else ""
def write_findings(method, lead_space, address, pc, at_init=""):
"""
Writes contract address and EVM program counter indicating whether counter was read at constructor
:param method: pointer to the object with the write method
:param lead_space: leading white space
:param address: contract address
:param pc: program counter
:param at_init: Boolean
:return: pass
"""
method.write(f"{lead_space}Contract: {address:#x}")
method.write(
f'{lead_space}EVM Program counter: {pc:#x}{" (at constructor)" if at_init else ""}\n'
)
def calculate_coverage(runtime_bytecode, seen):
"""Calculates what percentage of runtime_bytecode has been seen"""
count, total = 0, 0
bytecode = SolidityMetadata._without_metadata(runtime_bytecode)
for i in EVMAsm.disassemble_all(bytecode):
if i.pc in seen:
count += 1
total += 1
if total == 0:
# No runtime_bytecode
return 0
return count * 100.0 / total
class ManticoreEVM(ManticoreBase):
"""Manticore EVM manager
Usage Ex::
from manticore.ethereum import ManticoreEVM, ABI
m = ManticoreEVM()
#And now make the contract account to analyze
source_code = '''
pragma solidity ^0.4.15;
contract AnInt {
uint private i=0;
function set(uint value){
i=value
}
}
'''
#Initialize user and contracts
user_account = m.create_account(balance=1000)
contract_account = m.solidity_create_contract(source_code, owner=user_account, balance=0)
contract_account.set(12345, value=100)
m.finalize()
"""
_published_events = {"solve"}
def make_symbolic_buffer(self, size, name=None, avoid_collisions=False):
"""Creates a symbolic buffer of size bytes to be used in transactions.
You can operate on it normally and add constraints to manticore.constraints
via manticore.constrain(constraint_expression)
Example use::
symbolic_data = m.make_symbolic_buffer(320)
m.constrain(symbolic_data[0] == 0x65)
m.transaction(caller=attacker_account,
address=contract_account,
data=symbolic_data,
value=100000 )
"""
if name is None:
name = "TXBUFFER"
avoid_collisions = True
return self.constraints.new_array(
index_bits=256,
name=name,
index_max=size,
value_bits=8,
taint=frozenset(),
avoid_collisions=avoid_collisions,
)
def make_symbolic_value(self, nbits=256, name=None):
"""Creates a symbolic value, normally a uint256, to be used in transactions.
You can operate on it normally and add constraints to manticore.constraints
via manticore.constrain(constraint_expression)
Example use::
symbolic_value = m.make_symbolic_value()
m.constrain(symbolic_value > 100)
m.constrain(symbolic_value < 1000)
m.transaction(caller=attacker_account,
address=contract_account,
data=data,
value=symbolic_value )
"""
avoid_collisions = False
if name is None:
name = "SVALUE"
avoid_collisions = True
return self.constraints.new_bitvec(nbits, name=name, avoid_collisions=avoid_collisions)
def make_symbolic_address(self, *accounts, name=None, select="both"):
"""
Creates a symbolic address and constrains it to pre-existing addresses or the 0 address.
:param name: Name of the symbolic variable. Defaults to 'TXADDR' and later to 'TXADDR_<number>'
:param select: Whether to select contracts or normal accounts. Not implemented for now.
:return: Symbolic address in form of a BitVecVariable.
"""
if select not in ("both", "normal", "contract"):
raise EthereumError("Wrong selection type")
# if select in ("normal", "contract"):
# FIXME need to select contracts or normal accounts
# raise NotImplemented
if not accounts:
accounts = self._accounts.values()
avoid_collisions = False
if name is None:
name = "TXADDR"
avoid_collisions = True
symbolic_address = self.constraints.new_bitvec(
160, name=name, avoid_collisions=avoid_collisions
)
constraint = symbolic_address == 0
for account in accounts:
if select == "contract" and not self.get_code(account):
continue
if select == "normal" and self.get_code(account):
continue
constraint = Operators.OR(symbolic_address == int(account), constraint)
self.constrain(constraint)
return symbolic_address
def constrain(self, constraint):
if self.count_states() == 0:
self.constraints.add(constraint)
else:
for state in self.all_states:
state.constrain(constraint)
@staticmethod
def _link(bytecode, libraries=None):
has_dependencies = "_" in bytecode
hex_contract = bytecode
if has_dependencies:
deps = {}
pos = 0
while pos < len(hex_contract):
if hex_contract[pos] == "_":
lib_placeholder = hex_contract[pos : pos + 40]
# This is all very weak...
# Contract names starting/ending with _ ?
# Contract names longer than 40 bytes ?
if ":" in lib_placeholder:
# __/tmp/tmp_9k7_l:Manticore______________
lib_name = lib_placeholder.split(":")[1].strip("_")
deps.setdefault(lib_name, []).append(pos)
else:
lib_name = lib_placeholder.strip("_")
deps.setdefault(lib_name, []).append(pos)
pos += 40
else:
pos += 2
if libraries is None:
raise DependencyError(deps.keys())
libraries = dict(libraries)
hex_contract_lst = list(hex_contract)
for lib_name, pos_lst in deps.items():
try:
lib_address = libraries[lib_name]
except KeyError:
raise DependencyError([lib_name])
for pos in pos_lst:
hex_contract_lst[pos : pos + 40] = "%040x" % int(lib_address)
hex_contract = "".join(hex_contract_lst)
return bytearray(binascii.unhexlify(hex_contract))
@staticmethod
def _compile_through_crytic_compile(filename, contract_name, libraries, crytic_compile_args):
"""
:param filename: filename to compile
:param contract_name: contract to extract
:param libraries: an itemizable of pairs (library_name, address)
:param crytic_compile_args: crytic compile options (https://github.com/crytic/crytic-compile/wiki/Configuration)
:type crytic_compile_args: dict
:return:
"""
try:
if crytic_compile_args:
crytic_compile = CryticCompile(filename, **crytic_compile_args)
else:
crytic_compile = CryticCompile(filename)
if crytic_compile.is_in_multiple_compilation_unit(contract_name):
raise EthereumError(
f"{contract_name} is shared in multiple compilation units, please split the codebase to prevent the duplicate"
)
for compilation_unit in crytic_compile.compilation_units.values():
if not contract_name:
if len(compilation_unit.contracts_names_without_libraries) > 1:
raise EthereumError(
f"Solidity file must contain exactly one contract or you must select one. Contracts found: {', '.join(compilation_unit.contracts_names)}"
)
contract_name = list(compilation_unit.contracts_names_without_libraries)[0]
if contract_name not in compilation_unit.contracts_names:
raise ValueError(f"Specified contract not found: {contract_name}")
name = contract_name
libs = compilation_unit.libraries_names(name)
if libraries:
libs = [l for l in libs if l not in libraries]
if libs:
raise DependencyError(libs)
bytecode = bytes.fromhex(compilation_unit.bytecode_init(name, libraries))
runtime = bytes.fromhex(compilation_unit.bytecode_runtime(name, libraries))
srcmap = compilation_unit.srcmap_init(name)
srcmap_runtime = compilation_unit.srcmap_runtime(name)
hashes = compilation_unit.hashes(name)
abi = compilation_unit.abi(name)
filename = None
for _fname, contracts in compilation_unit.filename_to_contracts.items():
if name in contracts:
filename = _fname.absolute
break
if filename is None:
raise EthereumError(
f"Could not find a contract named {name}. Contracts found: {', '.join(compilation_unit.contracts_names)}"
)
with open(filename) as f:
source_code = f.read()
return name, source_code, bytecode, runtime, srcmap, srcmap_runtime, hashes, abi
except InvalidCompilation as e:
raise EthereumError(
f"Errors : {e}\n. Solidity failed to generate bytecode for your contract. Check if all the abstract functions are implemented. "
)
@staticmethod
def _compile(source_code, contract_name, libraries=None, crytic_compile_args=None):
"""Compile a Solidity contract, used internally
:param source_code: solidity source
:type source_code: string (filename, directory, etherscan address) or a file handle
:param contract_name: a string with the name of the contract to analyze
:param libraries: an itemizable of pairs (library_name, address)
:param crytic_compile_args: crytic compile options (https://github.com/crytic/crytic-compile/wiki/Configuration)
:type crytic_compile_args: dict
:return: name, source_code, bytecode, srcmap, srcmap_runtime, hashes
:return: name, source_code, bytecode, runtime, srcmap, srcmap_runtime, hashes, abi, warnings
"""
crytic_compile_args = dict() if crytic_compile_args is None else crytic_compile_args
if isinstance(source_code, io.IOBase):
source_code = source_code.name
if (
isinstance(source_code, str)
and not is_supported(source_code)
and
# Until https://github.com/crytic/crytic-compile/issues/103 is implemented
"ignore_compile" not in crytic_compile_args
):
with tempfile.NamedTemporaryFile("w+", suffix=".sol") as temp:
temp.write(source_code)
temp.flush()
compilation_result = ManticoreEVM._compile_through_crytic_compile(
temp.name, contract_name, libraries, crytic_compile_args
)
else:
compilation_result = ManticoreEVM._compile_through_crytic_compile(
source_code, contract_name, libraries, crytic_compile_args
)
(
name,
source_code,
bytecode,
runtime,
srcmap,
srcmap_runtime,
hashes,
abi,
) = compilation_result
warnings = ""
return (name, source_code, bytecode, runtime, srcmap, srcmap_runtime, hashes, abi, warnings)
@property
def accounts(self):
return dict(self._accounts)
def account_name(self, address):
for name, account in self._accounts.items():
if account.address == address:
return name
return "0x{:x}".format(address)
@property
def normal_accounts(self):
return {
name: account
for name, account in self._accounts.items()
if not isinstance(account, EVMContract)
}
@property
def contract_accounts(self):
return {
name: account
for name, account in self._accounts.items()
if isinstance(account, EVMContract)
}
def get_account(self, name):
return self._accounts[name]
def __init__(self, plugins=None, **kwargs):
"""
A Manticore EVM manager
:param plugins: the plugins to register in this manticore manager
"""
# Make the constraint store
constraints = ConstraintSet()
# make the ethereum world state
world = evm.EVMWorld(constraints)
initial_state = State(constraints, world)
super().__init__(initial_state, **kwargs)
if plugins is not None:
for p in plugins:
self.register_plugin(p)
self.subscribe("will_terminate_state", self._terminate_state_callback)
self.subscribe("did_evm_execute_instruction", self._did_evm_execute_instruction_callback)
if consts.sha3 is consts.sha3.concretize:
self.subscribe("on_symbolic_function", self._on_concretize)
elif consts.sha3 is consts.sha3.symbolicate:
self.subscribe("on_symbolic_function", self._on_unsound_symbolication)
elif consts.sha3 is consts.sha3.fake:
self.subscribe("on_symbolic_function", self._on_unsound_symbolication)
self._accounts: Dict[str, EVMContract] = dict()
self._serializer = PickleSerializer()
self.constraints = constraints
self.detectors: Dict[str, Detector] = {}
self.metadata: Dict[int, SolidityMetadata] = {}
@property
def world(self):
"""The world instance or None if there is more than one state"""
return self.get_world()
# deprecate this 5 in favor of for state in m.all_states: do stuff?
@property # type: ignore
@deprecated("You should iterate over `m.all_states` instead.")
def completed_transactions(self):
with self.locked_context("ethereum") as context:
return context["_completed_transactions"]
@deprecated("You should use the `platform` member of a `state` instance instead.")
def get_world(self, state_id=None):
"""Returns the evm world of `state_id` state."""
if state_id is None:
state_id = self._ready_states[0]
state = self._load(state_id)
if state is None:
return None
else:
return state.platform
@deprecated("Instead, call `get_balance` on a state instance")
def get_balance(self, address, state_id=None):
"""Balance for account `address` on state `state_id`"""
if isinstance(address, EVMAccount):
address = int(address)
return self.get_world(state_id).get_balance(address)
@deprecated("Instead, call `get_storage_data` on a state instance")
def get_storage_data(self, address, offset, state_id=None):
"""Storage data for `offset` on account `address` on state `state_id`"""
if isinstance(address, EVMAccount):
address = int(address)
return self.get_world(state_id).get_storage_data(address, offset)
@deprecated("Instead, call `get_code` on a state instance")
def get_code(self, address, state_id=None):
"""Storage data for `offset` on account `address` on state `state_id`"""
if isinstance(address, EVMAccount):
address = int(address)
return self.get_world(state_id).get_code(address)
@deprecated("Instead, use `state.last_transaction.return_data` on a state instance")
def last_return(self, state_id=None):
"""Last returned buffer for state `state_id`"""
state = self.load(state_id)
return state.platform.last_transaction.return_data
@deprecated("Instead, use `state.transactions` on a state instance")
def transactions(self, state_id=None):
"""Transactions list for state `state_id`"""
state = self._load(state_id)
return state.platform.transactions
@deprecated("Instead, use `state.transactions` on a state instance")
def human_transactions(self, state_id=None):
"""Transactions list for state `state_id`"""
state = self.load(state_id)
return state.platform.human_transactions
def make_symbolic_arguments(self, types):
"""
Build a reasonable set of symbolic arguments matching the types list
"""
from . import abitypes
return self._make_symbolic_arguments(abitypes.parse(types))
def _make_symbolic_arguments(self, ty):
"""This makes a tuple of symbols to be used as arguments of type ty"""
# If the types describe an string or an array this will produce strings
# or arrays of a default size.
# TODO: add a configuration constant for these two
default_string_size = 32
default_array_size = 32
if ty[0] in ("int", "uint"):
result = self.make_symbolic_value()
elif ty[0] == "bytesM":
result = self.make_symbolic_buffer(size=ty[1])
elif ty[0] == "function":
address = self.make_symbolic_value()
func_id = self.make_symbolic_buffer(size=4)
result = (address, func_id)
elif ty[0] in ("bytes", "string"):
result = self.make_symbolic_buffer(size=default_string_size)
elif ty[0] == "tuple":
result = ()
for ty_i in ty[1]:
result += (self._make_symbolic_arguments(ty_i),)
elif ty[0] == "array":
result = []
rep = ty[1]
if rep is None:
rep = default_array_size
for _ in range(rep):
result.append(self._make_symbolic_arguments(ty[2]))
else:
raise NotImplementedError(f"Could not produce symbolic argument of type {ty[0]}")
return result
def solidity_create_contract(
self,
source_code,
owner,
name=None,
contract_name=None,
libraries=None,
balance=0,
address=None,
args=(),
gas=None,
compile_args=None,
):
"""Creates a solidity contract and library dependencies
:param source_code: solidity source code
:type source_code: string (filename, directory, etherscan address) or a file handle
:param owner: owner account (will be default caller in any transactions)
:type owner: int or EVMAccount
:param contract_name: Name of the contract to analyze (optional if there is a single one in the source code)
:type contract_name: str
:param balance: balance to be transferred on creation
:type balance: int or BitVecVariable
:param address: the address for the new contract (optional)
:type address: int or EVMAccount
:param tuple args: constructor arguments
:param compile_args: crytic compile options #FIXME(https://github.com/crytic/crytic-compile/wiki/Configuration)
:type compile_args: dict
:param gas: gas budget for each contract creation needed (may be more than one if several related contracts defined in the solidity source)
:type gas: int
:rtype: EVMAccount
"""
if compile_args is None:
compile_args = dict()
if libraries is None:
deps = {}
else:
deps = dict(libraries)
contract_names = [contract_name]
while contract_names:
contract_name_i = contract_names.pop()
try:
compile_results = self._compile(
source_code, contract_name_i, libraries=deps, crytic_compile_args=compile_args
)
md = SolidityMetadata(*compile_results)
if contract_name_i == contract_name:
constructor_types = md.get_constructor_arguments()
if constructor_types != "()":
if args is None:
args = self.make_symbolic_arguments(constructor_types)
constructor_data = ABI.serialize(constructor_types, *args)
else:
constructor_data = b""
# Balance could be symbolic, lets ask the solver
# Option 1: balance can not be 0 and the function is marked as not payable
maybe_balance = balance == 0
self._publish(
"will_solve", None, self.constraints, maybe_balance, "can_be_true"
)
must_have_balance = SelectedSolver.instance().can_be_true(
self.constraints, maybe_balance
)
self._publish(
"did_solve",
None,
self.constraints,
maybe_balance,
"can_be_true",
must_have_balance,
)
if not must_have_balance:
# balance always != 0
if not md.constructor_abi["payable"]:
raise EthereumError(
f"Can't create solidity contract with balance ({balance}) "
f"different than 0 because the contract's constructor is not payable."
)
for state in self.ready_states:
world = state.platform
expr = Operators.UGE(world.get_balance(owner.address), balance)
self._publish("will_solve", None, self.constraints, expr, "can_be_true")
sufficient = SelectedSolver.instance().can_be_true(
self.constraints,
expr,
)
self._publish(
"did_solve", None, self.constraints, expr, "can_be_true", sufficient
)
if not sufficient:
raise EthereumError(
f"Can't create solidity contract with balance ({balance}) "
f"because the owner account ({owner}) has insufficient balance."
)
contract_account = self.create_contract(
owner=owner,
balance=balance,
address=address,
init=md._init_bytecode + constructor_data,
name=name,
gas=gas,
)
else:
contract_account = self.create_contract(
owner=owner, init=md._init_bytecode, balance=0, gas=gas
)
if contract_account is None:
return None
self.metadata[int(contract_account)] = md
deps[contract_name_i] = int(contract_account)
except DependencyError as e:
contract_names.append(contract_name_i)
for lib_name in e.lib_names:
if lib_name not in deps:
contract_names.append(lib_name)
except EthereumError as e:
logger.info(f"Failed to build contract {contract_name_i} {str(e)}")
self.kill()
raise
# If the contract was created successfully in at least 1 state return account
for state in self.ready_states:
if state.platform.get_code(int(contract_account)):
return contract_account
logger.info("Failed to compile contract %r", contract_names)
def get_nonce(self, address):
# type forgiveness:
address = int(address)
# get all nonces for states containing this address:
nonces = set(
state.platform.get_nonce(address)
for state in self.ready_states
if address in state.platform
)
if not nonces:
raise NoAliveStates("There are no alive states containing address %x" % address)
elif len(nonces) != 1:
# if there are multiple states with this address, they all have to have the same nonce:
raise EthereumError(
"Cannot increase the nonce of address %x because it exists in multiple states with different nonces"
% address
)
else:
return next(iter(nonces))
def create_contract(self, owner, balance=0, address=None, init=None, name=None, gas=None):
"""Creates a contract
:param owner: owner account (will be default caller in any transactions)
:type owner: int or EVMAccount
:param balance: balance to be transferred on creation
:type balance: int or BitVecVariable
:param int address: the address for the new contract (optional)
:param str init: initializing evm bytecode and arguments
:param str name: a unique name for reference
:param gas: gas budget for the creation/initialization of the contract
:rtype: EVMAccount
"""
if not self.count_ready_states():
raise NoAliveStates
nonce = self.get_nonce(owner)
expected_address = evm.EVMWorld.calculate_new_address(int(owner), nonce=nonce)
if address is None:
address = expected_address
elif address != expected_address:
raise EthereumError(
"Address was expected to be %x but was given %x" % (expected_address, address)
)
# Name check
if name is None:
name = self._get_uniq_name("contract")
if name in self._accounts:
# Account name already used
raise EthereumError("Name already used")
self._transaction("CREATE", owner, balance, address, data=init, gas=gas)
# TODO detect failure in the constructor
if self.count_ready_states():
self._accounts[name] = EVMContract(
address=address, manticore=self, default_caller=owner, name=name
)
return self.accounts[name]
else:
logger.warning("No ready states")
def _get_uniq_name(self, stem):
count = 0
for name_i in self.accounts.keys():
if name_i.startswith(stem):
try:
count = max(count, int(name_i[len(stem) :]) + 1)
except Exception:
pass
name = "{:s}{:d}".format(stem, count)
assert name not in self.accounts
return name
def _all_addresses(self):
"""Returns all addresses in all running states"""
ret = set()
for state in self.ready_states:
ret |= set(state.platform.accounts)
return ret
def new_address(self):
"""Create a fresh 160bit address"""
all_addresses = self._all_addresses()
while True:
new_address = random.randint(100, pow(2, 160))
if new_address not in all_addresses:
return new_address
def start_block(
self, blocknumber=None, timestamp=None, difficulty=0, gaslimit=0, coinbase=None
):
for state in self.ready_states:
world = state.platform
world.start_block(
blocknumber=blocknumber,
timestamp=timestamp,
difficulty=difficulty,
gaslimit=gaslimit,
coinbase=coinbase,
)
def end_block(self):
for state in self.ready_states:
world = state.platform
world.end_block()
def transaction(self, caller, address, value, data, gas=None, price=1):
"""Issue a symbolic transaction in all running states
:param caller: the address of the account sending the transaction
:type caller: int or EVMAccount
:param address: the address of the contract to call
:type address: int or EVMAccount
:param value: balance to be transfered on creation
:type value: int or BitVecVariable
:param data: initial data
:param gas: gas budget
:param price: gas unit price
:raises NoAliveStates: if there are no alive states to execute
"""
self._transaction(
"CALL", caller, value=value, address=address, data=data, gas=gas, price=price
)
def create_account(self, balance=0, address=None, code=None, name=None, nonce=None):
"""Low level creates an account. This won't generate a transaction.
:param balance: balance to be set on creation (optional)
:type balance: int or BitVecVariable
:param address: the address for the new account (optional)
:type address: int
:param code: the runtime code for the new account (None means normal account), str or bytes (optional)
:param nonce: force a specific nonce
:param name: a global account name eg. for use as reference in the reports (optional)
:return: an EVMAccount
"""
# Need at least one state where to apply this
if not self.count_ready_states():
raise NoAliveStates
# Name check
if name is None:
if code is None:
name = self._get_uniq_name("normal")
else:
name = self._get_uniq_name("contract")
if name in self._accounts:
# Account name already used
raise EthereumError("Name already used")
# Balance check
if not isinstance(balance, (int, BitVec)):
raise EthereumError("Balance invalid type")
if isinstance(code, str):
code = bytes(code, "utf-8")
if code is not None and not isinstance(code, (bytes, Array)):
raise EthereumError("code bad type")
# Address check
# Let's just choose the address ourself. This is not yellow paper material
if address is None:
address = self.new_address()
if not isinstance(address, int):
raise EthereumError("A concrete address is needed")
assert address is not None
if address in map(int, self.accounts.values()):
# Address already used
raise EthereumError("Address already used")
# To avoid going full crazy we maintain a global list of addresses
# Different states may CREATE a different set of accounts.
# Accounts created by a human have the same address in all states.
for state in self.ready_states:
world = state.platform
if "_pending_transaction" in state.context:
raise EthereumError("This is bad. There should not be a pending transaction")
if address in world.accounts:
# Address already used
raise EthereumError(
"This is bad. Same address is used for different contracts in different states"
)
world.create_account(address, balance, code=code, storage=None, nonce=nonce)
self._accounts[name] = EVMAccount(address, manticore=self, name=name)
return self.accounts[name]
def _migrate_tx_expressions(self, state, caller, address, value, data, gas, price):
# Copy global constraints into each state.
# We should somehow remember what has been copied to each state
# In a second transaction we should only add new constraints.
# And actually only constraints related to whatever we are using in
# the tx. This is a FIXME
global_constraints = self.constraints
# Normally users will be making these symbolic expressions by creating
# global symbolic variables via ManticoreEVM.make_.... and those
# global expressions need to be imported into each state when a tx
# actually happens
if issymbolic(caller):
caller = state.migrate_expression(caller)
if issymbolic(address):
address = state.migrate_expression(address)
if issymbolic(value):
value = state.migrate_expression(value)
if issymbolic(data):
if isinstance(data, ArrayProxy): # FIXME is this necessary here?
data = data.array
data = state.migrate_expression(data)
if isinstance(data, Array):
data = ArrayProxy(array=data)
if issymbolic(gas):
gas = state.migrate_expression(gas)
if issymbolic(price):
gas = state.migrate_expression(price)
for c in global_constraints:
state.constrain(c)
return caller, address, value, data, gas, price
def _transaction(self, sort, caller, value=0, address=None, data=None, gas=None, price=1):
"""Initiates a transaction
:param caller: caller account
:type caller: int or EVMAccount
:param int address: the address for the transaction (optional)
:param value: value to be transferred
:param price: the price of gas for this transaction.
:type value: int or BitVecVariable
:param str data: initializing evm bytecode and arguments or transaction call data
:param gas: gas budget for current transaction
:rtype: EVMAccount
"""
if gas is None:
gas = consts.defaultgas
# Type Forgiveness
if isinstance(address, EVMAccount):
address = int(address)
if isinstance(caller, EVMAccount):
caller = int(caller)
# Defaults, call data is empty
if data is None:
data = b""
if isinstance(data, str):
data = bytes(data)
if not isinstance(data, (bytes, Array)):
raise TypeError(f"code bad type {type(data)}")
# Check types
if not isinstance(caller, (int, BitVec)):
raise TypeError(f"Caller invalid type {type(caller)}")
if not isinstance(value, (int, BitVec)):
raise TypeError(f"Value invalid type: {type(value)}")
if not isinstance(address, (int, BitVec)):
raise TypeError(f"address invalid type: {type(address)}")
if not isinstance(price, int):
raise TypeError(f"Price invalid type: {type(price)}")
# Check argument consistency and set defaults ...
if sort not in ("CREATE", "CALL"):
raise ValueError(f"unsupported transaction type: {sort}")
if sort == "CREATE":
# When creating data is the init_bytecode + arguments
if len(data) == 0:
raise EthereumError("An initialization bytecode is needed for a CREATE")
assert address is not None
assert caller is not None
# Transactions (like everything else) need at least one running state
if not self.count_ready_states():
raise NoAliveStates
# To avoid going full crazy, we maintain a global list of addresses
for state in self.ready_states:
world = state.platform
# if '_pending_transaction' in state.context:
# raise EthereumError("This is bad. It should not be a pending transaction")
# Choose an address here, because it will be dependent on the caller's nonce in this state
if address is None:
if issymbolic(caller):
# TODO (ESultanik): In order to handle this case, we are going to have to do something like fork
# over all possible caller addresses.
# But this edge case will likely be extremely rare, if ever ecountered.
raise EthereumError(
"Manticore does not currently support contracts with symbolic addresses creating new contracts"
)
address = world.new_address(caller)
# Migrate any expression to state specific constraint set
(
caller_migrated,
address_migrated,
value_migrated,
data_migrated,
gas_migrated,
price_migrated,
) = self._migrate_tx_expressions(state, caller, address, value, data, gas, price)
# Different states may CREATE a different set of accounts. Accounts
# that were crated by a human have the same address in all states.
# This diverges from the yellow paper but at least we check that we
# are not trying to create an already used address here
if sort == "CREATE":
if address in world.accounts:
# Address already used
raise EthereumError(
"This is bad. Same address is used for different contracts in different states"
)
state.platform.start_transaction(
sort=sort,
address=address_migrated,
price=price_migrated,
data=data_migrated,
caller=caller_migrated,
value=value_migrated,
gas=gas_migrated,
)
# run over potentially several states and
# generating potentially several others
self.run()
return address
def preconstraint_for_call_transaction(
self,
address: Union[int, EVMAccount],
data: Array,
value: Optional[Union[int, Expression]] = None,
contract_metadata: Optional[SolidityMetadata] = None,
):
"""Returns a constraint that excludes combinations of value and data that would cause an exception in the EVM
contract dispatcher.
:param address: address of the contract to call
:param value: balance to be transferred (optional)
:param data: symbolic transaction data
:param contract_metadata: SolidityMetadata for the contract (optional)
"""
if isinstance(address, EVMAccount):
address = int(address)
if not isinstance(address, int):
raise TypeError("invalid address type")
if not issymbolic(data):
raise TypeError("data must be a symbolic array")
if contract_metadata is None:
contract_metadata = self.metadata.get(address)
if contract_metadata is None:
raise TypeError("no Solidity metadata available for the contract address")
selectors = contract_metadata.function_selectors
if not selectors or len(data) <= 4:
return BoolConstant(value=True)
symbolic_selector = data[:4]
value_is_symbolic = issymbolic(value)
constraint = None
for selector in selectors:
c = symbolic_selector == selector
if value_is_symbolic and not contract_metadata.get_abi(selector)["payable"]:
c = Operators.AND(c, value == 0)
if constraint is None:
constraint = c
else:
constraint = Operators.OR(constraint, c)
return constraint
def multi_tx_analysis(
self,
solidity_filename,
contract_name=None,
tx_limit=None,
tx_use_coverage=True,
tx_send_ether=True,
tx_account="attacker",
tx_preconstrain=False,
args=None,
compile_args=None,
):
owner_account = self.create_account(balance=10**10, name="owner", address=0x10000)
attacker_account = self.create_account(balance=10**10, name="attacker", address=0x20000)
# Pretty print
logger.info("Starting symbolic create contract")
if tx_send_ether:
create_value = self.make_symbolic_value()
else:
create_value = 0
contract_account = self.solidity_create_contract(
solidity_filename,
contract_name=contract_name,
owner=owner_account,
args=args,
compile_args=compile_args,
balance=create_value,
gas=230000,
)
if tx_account == "attacker":
tx_account = [attacker_account]
elif tx_account == "owner":
tx_account = [owner_account]
elif tx_account == "combo1":
tx_account = [owner_account, attacker_account]
else:
self.kill()
raise EthereumError(
'The account to perform the symbolic exploration of the contract should be "attacker", "owner" or "combo1"'
)
if contract_account is None:
logger.info("Failed to create contract: exception in constructor")
return
prev_coverage = 0
current_coverage = 0
tx_no = 0
while (current_coverage < 100 or not tx_use_coverage) and not self.is_killed():
try:
logger.info("Starting symbolic transaction: %d", tx_no)
# run_symbolic_tx
symbolic_data = self.make_symbolic_buffer(320)
if tx_send_ether:
value = self.make_symbolic_value()
else:
value = 0
if tx_preconstrain:
self.constrain(
self.preconstraint_for_call_transaction(
address=contract_account, data=symbolic_data, value=value
)
)
self.transaction(
caller=tx_account[min(tx_no, len(tx_account) - 1)],
address=contract_account,
data=symbolic_data,
value=value,
gas=230000,
)
logger.info(
"%d alive states, %d terminated states",
self.count_ready_states(),
self.count_terminated_states(),
)
except NoAliveStates:
break
# Check if the maximum number of tx was reached
if tx_limit is not None and tx_no + 1 == tx_limit:
break
# Check if coverage has improved or not
if tx_use_coverage:
prev_coverage = current_coverage
current_coverage = self.global_coverage(contract_account)
found_new_coverage = prev_coverage < current_coverage
if not found_new_coverage:
break
tx_no += 1
def run(self, **kwargs):
# Ethereum can have several sequential runs each for a different human
# transaction. Each human transaction post a tx over all READY states.
# Some states will end in a REVERT or a failed TX ultimatelly changing
# very little state. Only the gas spent (and perhaps the nonce) will change
# in the state after the attempted and failed tx. These states are not
# considered for exploration in the next human tx/run
# To differentiate the terminated sucessful terminated states from the
# reverted (or not very interesting) ManticoreEVM uses another list:
# saved_states
# At the begining of a human tx/run it should not be any saved state
with self.locked_context("ethereum.saved_states", list) as saved_states:
if saved_states:
raise EthereumError("ethereum.saved_states should be empty")
# Every state.world has its pending_transaction filled. The run will
# process it and potentially generate several READY and.or TERMINATED states.
super().run(**kwargs)
# The run may have finished be timeout/cancel or by state exhaustion
# At this point we potentially have some READY states and some TERMINATED states
# No busy states though
# If there are ready states still then it was a paused execution
assert not self._ready_states
assert not self._busy_states
assert not self.is_running()
# ManticoreEthereum decided at terminate_state_callback which state is
# ready for next run and saved them at the context item
# 'ethereum.saved_states'
# Move successfully terminated states to ready states
with self.locked_context("ethereum.saved_states", list) as saved_states:
while saved_states:
state_id = saved_states.pop()
if state_id in self._terminated_states:
self._revive_state(state_id)
# Callbacks
def _on_concretize(self, state, func, data, result):
if not issymbolic(data):
y_concrete = func(data)
else:
with self.locked_context("ethereum", dict) as context:
# adding a single random example so we can explore further
x_concrete = state.solve_one(data)
y_concrete = func(x_concrete)
# Lets contraint the data to the single sound concretization
state.constrain(x_concrete == data)
result.append(y_concrete)
def _on_unsound_symbolication(self, state, func, data, result):
"""Apply the function symbolic unfriendly func to data
state: a manticore state
func: a concrete normal python function like `sha3()`
data: a concrete or symbolic value of the domain of func
result: an empty list where to put the result
This method returns a fresh free symbol Y representing all the potential
results of applying func to data.
The relations between the input and the output of funcis saved in an
internal table. Effectively an uninstantiated Function.
func(data) -> result = constraints.new_bitvec()
symbolic_pairs.append((data, result))
constraints.add(func_table is injective)
"""
name = func.__name__
value = func(data) # If func returns None then result should considered unknown/symbolic
# Value is known. Let's add it to our concrete database
if value is not None:
with self.locked_context("ethereum", dict) as ethereum_context:
global_known_pairs = ethereum_context.get(f"symbolic_func_conc_{name}", set())
if (data, value) not in global_known_pairs:
global_known_pairs.add((data, value))
ethereum_context[f"symbolic_func_conc_{name}"] = global_known_pairs
logger.info(f"Found a concrete {name} {data} -> {value}")
concrete_pairs = state.context.get(f"symbolic_func_conc_{name}", set())
concrete_pairs.add((data, value))
state.context[f"symbolic_func_conc_{name}"] = concrete_pairs
else:
# we can not calculate the concrete value lets use a fresh symbol
with self.locked_context("ethereum", dict) as ethereum_context:
functions = ethereum_context.get("symbolic_func", dict())
if name in functions and functions[name] != func:
logger.debug("Redefining symbolic function. Same name different functions")
functions[name] = func
ethereum_context["symbolic_func"] = functions
# We are adding a new pair to the symbolic pairs
# Reset the soundcheck if True
if state.context.get("soundcheck", None) == True:
state.context["soundcheck"] = None
data_var = state.new_symbolic_buffer(len(data)) # FIXME: generalize to bitvec
state.constrain(data_var == data)
data = data_var
# symbolic_pairs list of symbolic applications of func in sate
symbolic_pairs = state.context.get(f"symbolic_func_sym_{name}", [])
# lets make a fresh 256 bit symbol representing any potential hash
value = state.new_symbolic_value(256)
for x, y in symbolic_pairs:
# if we found another pair that matches use that instead
# the duplicated pair is not added to symbolic_pairs
if state.must_be_true(Operators.OR(x == data, y == value)):
data, value = x, y
break
else:
# New pair
# add basic conditions no-collisions; new pair is added to symbolic_pairs
for x, y in symbolic_pairs:
if len(x) == len(data):
constraint = (x == data) == (y == value)
else:
constraint = y != value
state.constrain(constraint)
symbolic_pairs.append((data, value))
state.context[f"symbolic_func_sym_{name}"] = symbolic_pairs
# let it return just new_hash
result.append(value)
def fix_unsound_symbolication_fake(self, state):
"""This method goes through all the applied symbolic functions and tries
to find a concrete matching set of pairs
"""
def make_cond(state, table):
symbolic_pairs = state.context.get(f"symbolic_func_sym_{table}", ())
# Make every result distant from each other
for x, y in symbolic_pairs:
state.constrain(Operators.EXTRACT(y, 0, 16) == 0)
with self.locked_context("ethereum", dict) as ethereum_context:
functions = ethereum_context.get("symbolic_func", list())
for table in functions:
make_cond(state, table)
# Ok all functions had a match for current state
return state.can_be_true(True)
def fix_unsound_symbolication_sound(self, state):
"""This method goes through all the applied symbolic functions and tries
to find a concrete matching set of pairs
"""
def concretize_known_pairs(state, symbolic_pairs, known_pairs):
# Each symbolic pair must match at least one of the concrete
# pairs we know
for xa, ya in symbolic_pairs:
cond = False
for xc, yc in known_pairs:
if len(xa) == len(xc):
cond = Operators.OR(Operators.AND(xa == xc, ya == yc), cond)
state.constrain(cond)
if cond is False:
return
def match(state, func, symbolic_pairs, concrete_pairs, start=None):
"""Tries to find a concrete match for the symbolic pairs. It uses
concrete_pairs (and potentially extends it with solved pairs) until
a matching set of concrete pairs is found, or fail.
state: the current state
func: the relation between domain and range in symbolic_pairs/concrete_pairs
symbolic_pairs: related symbolic values that need a set of solutions
concrete_pairs: known of concrete pairs that may match some of the symbolic pairs
"""
if time.time() - start > consts.sha3timeout:
logger.info("Timeout on sound check for state %d", state.id)
return False
# The base case. No symbolic pairs. Just check if the state is feasible.
if not symbolic_pairs:
return state.can_be_true(True)
if not state.can_be_true(True):
return False
for i in range(len(symbolic_pairs)):
# shuffle and if it failed X times at some depth bail configurable
x, y = symbolic_pairs[i]
new_symbolic_pairs = symbolic_pairs[:i] + symbolic_pairs[i + 1 :]
new_concrete_pairs = set(concrete_pairs)
unseen = True # Is added only unseen pairs could make it sat
for x_concrete, y_concrete in concrete_pairs:
if len(x) == len(x_concrete): # If the size of the buffer wont
# match it does not matter
unseen = Operators.AND(x != x_concrete, y != y_concrete, unseen)
# Search for a new unseen sha3 pair
with state as temp_state:
temp_state.constrain(unseen)
new_x_concretes = temp_state.solve_n(x, nsolves=1)
new_y_concretes = map(func, new_x_concretes)
new_concrete_pairs.update(zip(new_x_concretes, new_y_concretes))
"""
Idea:
new_x_concretes = check_offline_db(temp_state.solve_n(y, nsolves=1))
new_y_concretes = map(func, new_x_concretes)
new_concrete_pairs.update(zip(new_x_concretes, new_y_concretes))
"""
# Consider all the new set of sha3 pairs and rebuild the seen condition
seen = False
for x_concrete, y_concrete in new_concrete_pairs:
if len(x) == len(x_concrete): # If the size of the buffer wont
# match it does not matter
seen = Operators.OR(Operators.AND(x == x_concrete, y == y_concrete), seen)
# With the current x,y pair being one of the known sha3 pairs try
# to match the following symbolic pairs
with state as temp_state:
temp_state.constrain(seen)
if match(temp_state, func, new_symbolic_pairs, new_concrete_pairs, start=start):
concrete_pairs.update(new_concrete_pairs)
return True
return False
with self.locked_context("ethereum", dict) as ethereum_context:
functions = ethereum_context.get("symbolic_func", list())
known_pairs_dict = {}
for table in functions:
known_pairs_dict[table] = ethereum_context.get(f"symbolic_func_conc_{table}", set())
for table in functions:
symbolic_pairs = state.context.get(f"symbolic_func_sym_{table}", ())
known_pairs = known_pairs_dict[table]
new_known_pairs = set(known_pairs)
# Add concrete knowledge in
for xa, ya in symbolic_pairs:
for xc, yc in known_pairs:
state.constrain(Operators.OR(xa == xc, ya != yc))
state.constrain(Operators.OR(xa != xc, ya == yc))
# Keep only pairs that have not yet been fixed to a single solution
symbolic_pairs = [
(xa, ya)
for xa, ya in symbolic_pairs
if len(state.solve_n(xa[0], nsolves=2)) != 1
or len(state.solve_n(ya, nsolves=2)) != 1
]
if not match(
state, functions[table], symbolic_pairs, new_known_pairs, start=time.time()
):
return False
# Now paste the known pairs in the state constraints
concretize_known_pairs(state, symbolic_pairs, new_known_pairs)
state.context[f"symbolic_func_sym_{table}_done"] = symbolic_pairs
# Share the concrete pairs with other states.
# Depends on the order in which the states are processed
# with self.locked_context("ethereum", dict) as ethereum_context:
# ethereum_context[f"symbolic_func_conc_{table}"] = new_known_pairs
# Ok all functions had a match for current state
return state.can_be_true(True)
def fix_unsound_symbolication(self, state):
soundcheck = state.context.get("soundcheck", None)
if soundcheck is not None:
return soundcheck
if consts.sha3 is consts.sha3.symbolicate:
state.context["soundcheck"] = self.fix_unsound_symbolication_sound(state)
elif consts.sha3 is consts.sha3.fake:
state.context["soundcheck"] = self.fix_unsound_symbolication_fake(state)
else:
state.context["soundcheck"] = True
return state.context["soundcheck"]
def _terminate_state_callback(self, state, e):
"""INTERNAL USE
Every time a state finishes executing the last transaction, we save it in
our private list
"""
if isinstance(e, AbandonState):
# do nothing
return
world = state.platform
state.context["last_exception"] = e
e.testcase = False # Do not generate a testcase file
if not world.all_transactions:
logger.debug("Something went wrong: search terminated in the middle of an ongoing tx")
return
tx = world.all_transactions[-1]
# we initiated the Tx; we need process the outcome for now.
# Fixme incomplete.
"""
if tx.is_human:
if tx.sort == "CREATE":
if tx.result == "RETURN":
world.set_code(tx.address, tx.return_data)
else:
world.delete_account(tx.address)
else:
logger.info(
"Manticore exception: state should be terminated only at the end of the human transaction"
)
"""
# Human tx that ends in this wont modify the storage so finalize and
# generate a testcase. FIXME This should be configurable as REVERT and
# THROW; it actually changes the balance and nonce? of some accounts
if tx.return_value == 0:
pass
else:
# if not a revert, we save the state for further transactions
with self.locked_context("ethereum.saved_states", list) as saved_states:
saved_states.append(state.id)
# Callbacks
def _did_evm_execute_instruction_callback(self, state, instruction, arguments, result):
"""INTERNAL USE"""
# logger.debug("%s", state.platform.current_vm)
# TODO move to a plugin
at_init = state.platform.current_transaction.sort == "CREATE"
coverage_context_name = "evm.coverage"
with self.locked_context(coverage_context_name, list) as coverage:
if (state.platform.current_vm.address, instruction.pc, at_init) not in coverage:
coverage.append((state.platform.current_vm.address, instruction.pc, at_init))
state.context.setdefault("evm.trace", []).append(
(state.platform.current_vm.address, instruction.pc, at_init)
)
def get_metadata(self, address) -> Optional[SolidityMetadata]:
"""Gets the solidity metadata for address.
This is available only if address is a contract created from solidity
"""
return self.metadata.get(address)
def register_detector(self, d):
"""
Unregisters a plugin. This will invoke detector's `on_unregister` callback.
Shall be called after `.finalize`.
"""
if not isinstance(d, Detector):
raise EthereumError("Not a Detector")
if d.name in self.detectors:
raise EthereumError("Detector already registered")
self.detectors[d.name] = d
self.register_plugin(d)
return d.name
def unregister_detector(self, d):
"""
Unregisters a detector. This will invoke detector's `on_unregister` callback.
Shall be called after `.finalize` - otherwise, finalize won't add detector's finding to `global.findings`.
"""
if not isinstance(d, (Detector, str)):
raise EthereumError("Not a Detector")
name = d
if isinstance(d, Detector):
name = d.name
if name not in self.detectors:
raise EthereumError("Detector not registered")
d = self.detectors[name]
del self.detectors[name]
self.unregister_plugin(d)
@property
def workspace(self):
return self._workspace._store.uri
@property
def global_findings(self):
try:
return self._global_findings
except AttributeError:
raise Exception("You need to call finalize() first")
def current_location(self, state):
world = state.platform
address = world.current_vm.address
pc = world.current_vm.pc
at_init = world.current_transaction.sort == "CREATE"
output = io.StringIO()
write_findings(output, "", address, pc, at_init)
md = self.get_metadata(address)
if md is not None:
src = md.get_source_for(pc, runtime=not at_init)
output.write("Snippet:\n")
output.write(src.replace("\n", "\n ").strip())
output.write("\n")
return output.getvalue()
def generate_testcase(self, state, message="", only_if=None, name="user"):
"""
The only_if parameter should be a symbolic expression. If this argument is provided, and the expression
*can be true* in this state, a testcase is generated such that the expression will be true in the state.
If it *is impossible* for the expression to be true in the state, a testcase is not generated.
This is useful for conveniently checking a particular invariant in a state, and generating a testcase if
the invariant can be violated.
For example, invariant: "balance" must not be 0. We can check if this can be violated and generate a
testcase::
m.generate_testcase(state, 'balance CAN be 0', only_if=balance == 0)
# testcase generated with an input that will violate invariant (make balance == 0)
"""
try:
with state as temp_state:
if only_if is not None:
temp_state.constrain(only_if)
return self._generate_testcase_ex(temp_state, message, name=name)
except ManticoreError:
return None
def _generate_testcase_ex(self, state, message="", name="user"):
"""
Generate a testcase in the outputspace for the given program state.
An exception is raised if the state is unsound or unfeasible
On return you get a Testcase containing all the informations about the state.
A Testcase is a collection of files living at the OutputSpace.
Registered plugins and other parts of Manticore add files to the Testcase.
User can add more streams to is like this:
testcase = m.generate_testcase_ex(state)
with testcase.open_stream("txt") as descf:
descf.write("This testcase is interesting!")
:param manticore.core.state.State state:
:param str message: longer description of the testcase condition
:param str name: short string used as the prefix for the outputspace key (e.g. filename prefix for testcase files)
:return: a Testcase
:rtype: bool
"""
# Refuse to generate a testcase from an unsound state
if not self.fix_unsound_symbolication(state):
raise ManticoreError("Trying to generate a testcase out of an unsound state path")
blockchain = state.platform
# FIXME. workspace should not be responsible for formating the output
# each object knows its secrets, and each class should be able to report
# its final state
testcase = super().generate_testcase(
state, message + f"({len(blockchain.human_transactions)} txs)", name=name
)
local_findings = set()
for detector in self.detectors.values():
for address, pc, finding, at_init, constraint in detector.get_findings(state):
if (address, pc, finding, at_init) not in local_findings:
if state.can_be_true(constraint):
local_findings.add((address, pc, finding, at_init, constraint))
if len(local_findings):
with testcase.open_stream("findings") as findings:
for address, pc, finding, at_init, constraint in local_findings:
if not state.can_be_true(constraint):
continue
state.constrain(constraint)
findings.write("- %s -\n" % finding)
write_findings(findings, " ", address, pc, at_init)
md = self.get_metadata(address)
if md is not None:
src = md.get_source_for(pc, runtime=not at_init)
findings.write(" Snippet:\n")
findings.write(src.replace("\n", "\n ").strip())
findings.write("\n")
with testcase.open_stream("summary") as stream:
is_something_symbolic = state.platform.dump(stream, state, self, message)
with self.locked_context("ethereum", dict) as ethereum_context:
functions = ethereum_context.get("symbolic_func", dict())
for table in functions:
concrete_pairs = state.context.get(f"symbolic_func_conc_{table}", ())
if concrete_pairs:
stream.write(f"Known for {table}:\n")
for key, value in concrete_pairs:
stream.write("%s::%x\n" % (binascii.hexlify(key), value))
if is_something_symbolic:
stream.write(
"\n\n(*) Example solution given. Value is symbolic and may take other values\n"
)
# Transactions
with testcase.open_stream("tx") as tx_summary:
with testcase.open_stream("tx.json") as txjson:
txlist = []
is_something_symbolic = False
for sym_tx in blockchain.human_transactions: # external transactions
tx_summary.write(
"Transactions No. %d\n" % blockchain.transactions.index(sym_tx)
)
conc_tx = sym_tx.concretize(state)
txlist.append(conc_tx.to_dict(self))
is_something_symbolic = sym_tx.dump(tx_summary, state, self, conc_tx=conc_tx)
if is_something_symbolic:
tx_summary.write(
"\n\n(*) Example solution given. Value is symbolic and may take other values\n"
)
json.dump(txlist, txjson)
# logs
if consts.events:
with testcase.open_stream("logs") as logs_summary:
is_something_symbolic = False
for log_item in blockchain.logs:
is_log_symbolic = issymbolic(log_item.memlog)
is_something_symbolic = is_log_symbolic or is_something_symbolic
solved_memlog = state.solve_one(log_item.memlog)
logs_summary.write("Address: %x\n" % log_item.address)
logs_summary.write(
"Memlog: %s (%s) %s\n"
% (
binascii.hexlify(solved_memlog).decode(),
printable_bytes(solved_memlog),
flagged(is_log_symbolic),
)
)
logs_summary.write("Topics:\n")
for i, topic in enumerate(log_item.topics):
logs_summary.write(
"\t%d) %x %s" % (i, state.solve_one(topic), flagged(issymbolic(topic)))
)
with testcase.open_stream("constraints") as smt_summary:
smt_summary.write(str(state.constraints))
trace = state.context.get("evm.trace")
if trace:
with testcase.open_stream("trace") as f:
self._emit_trace_file(f, trace)
return testcase
@staticmethod
def _emit_trace_file(filestream, trace):
"""
:param filestream: file object for the workspace trace file
:param trace: list of (contract address, pc) tuples
:type trace: list[tuple(int, int)]
"""
for contract, pc, at_init in trace:
if pc == 0:
filestream.write("---\n")
ln = "0x{:x}:0x{:x} {}\n".format(contract, pc, "*" if at_init else "")
filestream.write(ln)
@ManticoreBase.at_not_running
def finalize(self, procs=None, only_alive_states=False):
"""
Terminate and generate testcases for all currently alive states (contract
states that cleanly executed to a STOP or RETURN in the last symbolic
transaction).
:param procs: force the number of local processes to use in the reporting
:param bool only_alive_states: if True, killed states (revert/throw/txerror) do not generate testscases
generation. Uses global configuration constant by default
"""
if procs is None:
procs = config.get_group("core").procs
logger.debug("Finalizing %d states.", self.count_states())
def finalizer(state_id):
st = self._load(state_id)
if self.fix_unsound_symbolication(st):
last_tx = st.platform.last_transaction
# Do not generate killed state if only_alive_states is True
if only_alive_states and last_tx.result in {"REVERT", "THROW", "TXERROR"}:
return
logger.debug("Generating testcase for state_id %d", state_id)
message = last_tx.result if last_tx else "NO STATE RESULT (?)"
self.generate_testcase(st, message=message)
def worker_finalize(q):
try:
while True:
finalizer(q.get_nowait())
except EmptyQueue:
pass
# Generate testcases for all but killed states
q = Queue()
for state_id in self._all_states:
# we need to remove -1 state before forking because it may be in memory
q.put(state_id)
report_workers = [Process(target=worker_finalize, args=(q,)) for _ in range(procs)]
for proc in report_workers:
proc.start()
for proc in report_workers:
proc.join()
global_findings = set()
for state in self.all_states:
if not self.fix_unsound_symbolication(state):
continue
for detector in self.detectors.values():
for address, pc, finding, at_init, constraint in detector.get_findings(state):
if (address, pc, finding, at_init) not in global_findings:
if state.can_be_true(constraint):
global_findings.add((address, pc, finding, at_init))
self._global_findings = global_findings
# global summary
with self._output.save_stream("global.findings") as global_findings_stream:
for address, pc, finding, at_init in self.global_findings:
global_findings_stream.write("- %s -\n" % finding)
write_findings(global_findings_stream, " ", address, pc, at_init)
md = self.get_metadata(address)
if md is not None:
source_code_snippet = md.get_source_for(pc, runtime=not at_init)
global_findings_stream.write(" Solidity snippet:\n")
global_findings_stream.write(" ".join(source_code_snippet.splitlines(True)))
global_findings_stream.write("\n")
with self._output.save_stream("global.summary") as global_summary:
# (accounts created by contract code are not in this list )
global_summary.write("Global runtime coverage:\n")
for address in self.contract_accounts.values():
global_summary.write(
"{:x}: {:2.2f}%\n".format(int(address), self.global_coverage(address))
)
with self.locked_context("ethereum", dict) as ethereum_context:
functions = ethereum_context.get("symbolic_func", dict())
for table in functions:
concrete_pairs = ethereum_context.get(f"symbolic_func_conc_{table}", ())
if concrete_pairs:
global_summary.write(f"Known for {table}:\n")
for key, value in concrete_pairs:
global_summary.write("%s::%x\n" % (binascii.hexlify(key), value))
for address, md in self.metadata.items():
with self._output.save_stream("global_%s.sol" % md.name) as global_src:
global_src.write(md.source_code)
with self._output.save_stream(
"global_%s.runtime_asm" % md.name
) as global_runtime_asm, self.locked_context("runtime_coverage") as seen:
runtime_bytecode = md.runtime_bytecode
count, total = 0, 0
for i in EVMAsm.disassemble_all(runtime_bytecode):
if (address, i.pc) in seen:
count += 1
global_runtime_asm.write("*")
else:
global_runtime_asm.write(" ")
global_runtime_asm.write("%4x: %s\n" % (i.pc, i))
total += 1
with self._output.save_stream(
"global_%s.init_asm" % md.name
) as global_init_asm, self.locked_context("init_coverage") as seen:
count, total = 0, 0
for i in EVMAsm.disassemble_all(md.init_bytecode):
if (address, i.pc) in seen:
count += 1
global_init_asm.write("*")
else:
global_init_asm.write(" ")
global_init_asm.write("%4x: %s\n" % (i.pc, i))
total += 1
with self._output.save_stream(
"global_%s.init_visited" % md.name
) as f, self.locked_context("init_coverage") as seen:
visited = set((o for (a, o) in seen if a == address))
for o in sorted(visited):
f.write("0x%x\n" % o)
with self._output.save_stream(
"global_%s.runtime_visited" % md.name
) as f, self.locked_context("runtime_coverage") as seen:
visited = set()
for (a, o) in seen:
if a == address:
visited.add(o)
for o in sorted(visited):
f.write("0x%x\n" % o)
self.save_run_data()
self.remove_all()
def global_coverage(self, account):
"""Returns code coverage for the contract on `account_address`.
This sums up all the visited code lines from any of the explored
states.
"""
account_address = int(account)
runtime_bytecode = None
# Search one state in which the account_address exists
for state in self.all_states:
world = state.platform
if account_address in world:
code = world.get_code(account_address)
runtime_bytecode = state.solve_one(code)
break
else:
return 0.0
with self.locked_context("evm.coverage") as coverage:
seen = {off for addr, off, init in coverage if addr == account_address and not init}
return calculate_coverage(runtime_bytecode, seen)
@property # type: ignore
@ManticoreBase.sync
@ManticoreBase.at_not_running
def ready_sound_states(self):
"""
Iterator over sound ready states.
This tries to solve any symbolic imprecision added by unsound_symbolication
and then iterates over the resultant set.
This is the recommended way to iterate over the resultant states after
an exploration that included unsound symbolication
"""
self.fix_unsound_all()
_ready_states = self._ready_states
for state_id in _ready_states:
state = self._load(state_id)
if self.fix_unsound_symbolication(state):
yield state
# Re-save the state in case the user changed its data
self._save(state, state_id=state_id)
@property # type: ignore
@ManticoreBase.sync
@ManticoreBase.at_not_running
def all_sound_states(self):
"""
Iterator over all sound states.
This tries to solve any symbolic imprecision added by unsound_symbolication
and then iterates over the resultant set.
This is the recommended to iterate over resultant steas after an exploration
that included unsound symbolication
"""
self.fix_unsound_all()
_ready_states = self._ready_states
for state_id in _ready_states:
state = self._load(state_id)
if self.fix_unsound_symbolication(state):
yield state
# Re-save the state in case the user changed its data
self._save(state, state_id=state_id)
def fix_unsound_all(self, procs=None):
"""
:param procs: force the number of local processes to use
"""
if procs is None:
procs = config.get_group("core").procs
# Fix unsoundness in all states
def finalizer(state_id):
state = self._load(state_id)
self.fix_unsound_symbolication(state)
self._save(state, state_id=state_id)
def worker_finalize(q):
try:
while True:
finalizer(q.get_nowait())
except EmptyQueue:
pass
# Generate testcases for all but killed states
q = Queue()
for state_id in self._all_states:
# we need to remove -1 state before forking because it may be in memory
q.put(state_id)
report_workers = [Process(target=worker_finalize, args=(q,)) for _ in range(procs)]
for proc in report_workers:
proc.start()
for proc in report_workers:
proc.join()