lib/cuckoo/common/abstracts.py
# Copyright (C) 2010-2012 Cuckoo Sandbox Developers.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import os
import logging
import ConfigParser
from lib.cuckoo.common.exceptions import CuckooMachineError, CuckooOperationalError, CuckooReportError
from lib.cuckoo.common.constants import CUCKOO_ROOT
from lib.cuckoo.common.utils import create_folder
log = logging.getLogger(__name__)
class Dictionary(dict):
"""Cuckoo custom dict."""
def __getattr__(self, key):
return self.get(key, None)
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
class MachineManager(object):
"""Base abstract class for analysis machine manager."""
def __init__(self):
self.module_name = ""
self.options = None
self.machines = []
def set_options(self, options):
"""Set machine manager options.
@param options: machine manager options dict.
"""
self.options = options
def initialize(self, module_name):
"""Read and load machines configuration, try to check the configuration.
@param module_name: module name.
"""
# Load.
self._initialize(module_name)
# Run initialization checks.
self._initialize_check()
def _initialize(self, module_name):
"""Read configuration.
@param module_name: module name.
"""
self.module_name = module_name
mmanager_opts = self.options.get(module_name)
for machine_id in mmanager_opts["machines"].strip().split(","):
try:
machine_opts = self.options.get(machine_id)
machine = Dictionary()
machine.id = machine_id
machine.label = machine_opts["label"]
machine.platform = machine_opts["platform"]
machine.ip = machine_opts["ip"]
machine.locked = False
self.machines.append(machine)
except (AttributeError, CuckooOperationalError):
log.warning("Configuration details about machine %s are missing. Continue" % machine_id)
continue
def _initialize_check(self):
"""Runs checks against virtualization software when a machine manager is initialized.
@note: in machine manager modules you may override or superclass this method.
@raise CuckooMachineError: if a misconfiguration or a unkown vm state is found.
"""
try:
configured_vm = self._list()
#for machine in self.machines:
# if machine.label not in configured_vm:
# raise CuckooMachineError("Configured machine %s was not detected or it's not in proper state" % machine.label)
except NotImplementedError:
pass
def availables(self):
"""How many machines are free.
@return: free machines count.
"""
count = 0
for machine in self.machines:
if not machine.locked:
count += 1
return count
def acquire(self, machine_id=None, platform=None):
"""Acquire a machine to start analysis.
@param machine_id: machine ID.
@param platform: machine platform.
@return: machine or None.
"""
if machine_id:
for machine in self.machines:
if machine.id == machine_id and not machine.locked:
machine.locked = True
return machine
elif platform:
for machine in self.machines:
if machine.platform == platform and not machine.locked:
machine.locked = True
return machine
else:
for machine in self.machines:
if not machine.locked:
machine.locked = True
return machine
return None
def release(self, label=None):
"""Release a machine.
@param label: machine name.
"""
if label:
for machine in self.machines:
if machine.label == label:
machine.locked = False
def running(self):
"""Returns running virtual machines.
@return: running virtual machines list.
"""
return [m for m in self.machines if m.locked]
def start(self, label=None):
"""Start a machine.
@param label: machine name.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
def stop(self, label=None):
"""Stop a machine.
@param label: machine name.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
def _list(self, vmx=None):
"""Lists virtual machines configured.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
class Processing(object):
"""Base abstract class for processing module."""
def __init__(self):
self.analysis_path = ""
self.logs_path = ""
def set_path(self, analysis_path):
"""Set paths.
@param analysis_path: analysis folder path.
"""
self.analysis_path = analysis_path
self.log_path = os.path.join(self.analysis_path, "analysis.log")
self.conf_path = os.path.join(self.analysis_path, "analysis.conf")
self.file_path = os.path.realpath(os.path.join(self.analysis_path, "binary"))
self.dropped_path = os.path.join(self.analysis_path, "files")
self.logs_path = os.path.join(self.analysis_path, "logs")
self.shots_path = os.path.join(self.analysis_path, "shots")
self.pcap_path = os.path.join(self.analysis_path, "dump.pcap")
def run(self):
"""Start processing.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
class Signature(object):
"""Base abstract class for signature."""
name = ""
description = ""
severity = 1
categories = []
authors = []
references = []
alert = False
enabled = True
minimum = None
maximum = None
def __init__(self):
self.data = []
def run(self, results=None):
"""Start signature processing.
@param results: analysis results.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
class Report(object):
"""Base abstract class for reporting module."""
def __init__(self):
self.analysis_path = ""
self.reports_path = ""
self.options = None
def set_path(self, analysis_path):
"""Set analysis folder path.
@param analysis_path: analysis folder path.
"""
self.analysis_path = analysis_path
self.conf_path = os.path.join(self.analysis_path, "analysis.conf")
self.reports_path = os.path.join(self.analysis_path, "reports")
try:
create_folder(folder=self.reports_path)
except CuckooOperationalError as e:
CuckooReportError(e)
def set_options(self, options):
"""Set report options.
@param options: report options dict.
"""
self.options = options
def run(self):
"""Start report processing.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError