modules/processing/static.py
# Copyright (C) 2010-2012 Cuckoo Sandbox Developers.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import os
import sys
import string
try:
import magic
HAVE_MAGIC = True
except:
HAVE_MAGIC = False
from lib.cuckoo.common.constants import CUCKOO_ROOT
from lib.cuckoo.common.abstracts import Processing
from lib.cuckoo.common.utils import convert_to_printable, File
import lib.pefile.pefile as pefile
import lib.pefile.peutils as peutils
# Partially taken from http://malwarecookbook.googlecode.com/svn/trunk/3/8/pescanner.py
class PortableExecutable:
"""PE analysis.
@note: Partially taken from http://malwarecookbook.googlecode.com/svn/trunk/3/8/pescanner.py.
"""
def __init__(self, file_path):
"""@param file_path: file path."""
self.file_path = file_path
self.pe = None
def _get_filetype(self, data):
"""Gets filetype, uses libmagic if available.
@param data: data to be analyzed.
@return: file type or None.
"""
if not IS_MAGIC:
return None
try:
ms = magic.open(magic.MAGIC_NONE)
ms.load()
file_type = ms.buffer(data)
except:
try:
file_type = magic.from_buffer(data)
except Exception:
return None
return file_type
def _get_peid_signatures(self):
"""Gets PEID signatures.
@return: matched signatures or None.
"""
if not self.pe:
return None
try:
signatures = peutils.SignatureDatabase(os.path.join(CUCKOO_ROOT, "data", "peutils" , "UserDB.TXT"))
return signatures.match(self.pe, ep_only = True)
except:
return None
def _get_imported_symbols(self):
"""Gets imported symbols.
@return: imported symbols dict or None.
"""
if not self.pe:
return None
imports = []
if hasattr(self.pe, "DIRECTORY_ENTRY_IMPORT"):
for entry in self.pe.DIRECTORY_ENTRY_IMPORT:
try:
symbols = []
for imported_symbol in entry.imports:
symbol = {}
symbol["address"] = hex(imported_symbol.address)
symbol["name"] = imported_symbol.name
symbols.append(symbol)
imports_section = {}
imports_section["dll"] = entry.dll
imports_section["imports"] = symbols
imports.append(imports_section)
except:
continue
return imports
def _get_exported_symbols(self):
"""Gets exported symbols.
@return: exported symbols dict or None.
"""
if not self.pe:
return None
exports = []
if hasattr(self.pe, "DIRECTORY_ENTRY_EXPORT"):
for exported_symbol in self.pe.DIRECTORY_ENTRY_EXPORT.symbols:
symbol = {}
symbol["address"] = hex(self.pe.OPTIONAL_HEADER.ImageBase + exported_symbol.address)
symbol["name"] = exported_symbol.name
symbol["ordinal"] = exported_symbol.ordinal
exports.append(symbol)
return exports
def _get_sections(self):
"""Gets sections.
@return: sections dict or None.
"""
if not self.pe:
return None
sections = []
for entry in self.pe.sections:
try:
section = {}
section["name"] = convert_to_printable(entry.Name.strip("\x00"))
section["virtual_address"] = hex(entry.VirtualAddress)
section["virtual_size"] = hex(entry.Misc_VirtualSize)
section["size_of_data"] = hex(entry.SizeOfRawData)
section["entropy"] = entry.get_entropy()
sections.append(section)
except:
continue
return sections
def _get_resources(self):
"""Get resources.
@return: resources dict or None.
"""
if not self.pe:
return None
resources = []
if hasattr(self.pe, "DIRECTORY_ENTRY_RESOURCE"):
for resource_type in self.pe.DIRECTORY_ENTRY_RESOURCE.entries:
try:
resource = {}
if resource_type.name is not None:
name = "%s" % resource_type.name
else:
name = "%s" % pefile.RESOURCE_TYPE.get(resource_type.struct.Id)
if name == None:
name = "%d" % resource_type.struct.Id
if hasattr(resource_type, "directory"):
for resource_id in resource_type.directory.entries:
if hasattr(resource_id, "directory"):
for resource_lang in resource_id.directory.entries:
data = self.pe.get_data(resource_lang.data.struct.OffsetToData, resource_lang.data.struct.Size)
filetype = self._get_filetype(data)
language = pefile.LANG.get(resource_lang.data.lang, None)
sublanguage = pefile.get_sublang_name_for_lang(resource_lang.data.lang, resource_lang.data.sublang)
resource["name"] = name
resource["offset"] = ("%-8s" % hex(resource_lang.data.struct.OffsetToData)).strip()
resource["size"] = ("%-8s" % hex(resource_lang.data.struct.Size)).strip()
resource["filetype"] = filetype
resource["language"] = language
resource["sublanguage"] = sublanguage
resources.append(resource)
except:
continue
return resources
def _get_versioninfo(self):
"""Get version info.
@return: info dict or None.
"""
if not self.pe:
return None
infos = []
if hasattr(self.pe, "VS_VERSIONINFO"):
if hasattr(self.pe, "FileInfo"):
for entry in self.pe.FileInfo:
try:
if hasattr(entry, "StringTable"):
for st_entry in entry.StringTable:
for str_entry in st_entry.entries.items():
entry = {}
entry["name"] = convert_to_printable(str_entry[0])
entry["value"] = convert_to_printable(str_entry[1])
infos.append(entry)
elif hasattr(entry, "Var"):
for var_entry in entry.Var:
if hasattr(var_entry, "entry"):
entry = {}
entry["name"] = convert_to_printable(var_entry.entry.keys()[0])
entry["value"] = convert_to_printable(var_entry.entry.values()[0])
infos.append(entry)
except:
continue
return infos
def run(self):
"""Run analysis.
@return: analysis results dict or None.
"""
if not os.path.exists(self.file_path):
return None
try:
self.pe = pefile.PE(self.file_path)
except pefile.PEFormatError:
return None
results = {}
results["peid_signatures"] = self._get_peid_signatures()
results["pe_imports"] = self._get_imported_symbols()
results["pe_exports"] = self._get_exported_symbols()
results["pe_sections"] = self._get_sections()
results["pe_resources"] = self._get_resources()
results["pe_versioninfo"] = self._get_versioninfo()
results["imported_dll_count"] = len([x for x in results["pe_imports"] if "dll" in x and x['dll'] is not None ])
return results
class Static(Processing):
"""Static analysis."""
def run(self):
"""Run analysis.
@return: results dict.
"""
self.key = "static"
static = {}
file_type = File(self.file_path).get_type()
if file_type:
if "PE32" in file_type:
static = PortableExecutable(self.file_path).run()
return static