modules/reporting/maec11.py
# Copyright (C) 2010-2012 Cuckoo Sandbox Developers.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import os
import lib.maec.maec11 as maec
from lib.cuckoo.common.abstracts import Report
from lib.cuckoo.common.exceptions import CuckooReportError
from lib.cuckoo.common.utils import datetime_to_iso
class Report(Report):
"""Generates a MAEC 1.1 report."""
def run(self, results):
"""Writes report.
@param results: Cuckoo results dict.
@raise CuckooReportError: if fails to write report.
"""
self.idMap = {}
# Save results
self.results = results
# Build MAEC doc
self.addBundle()
self.addPools()
self.addAnalysis()
self.addActions()
# Write report.
self.output()
def addBundle(self):
"""Generates MAEC bundle structure."""
self.idMap['prefix'] = "maec:%s" % self.results['file']['md5']
# Generate bundle
self.m = maec.BundleType(
id = "%s:bnd:1" % self.idMap['prefix'],
schema_version='1.1'
)
# Analyses
self.analyses = maec.AnalysesType()
self.m.set_Analyses(self.analyses)
# Actions
self.actions = maec.ActionsType()
self.m.set_Actions(self.actions)
# Behaviors
self.behaviors = maec.BehaviorsType()
self.m.set_Behaviors(self.behaviors)
# Pools
self.pools = maec.PoolsType()
self.m.set_Pools(self.pools)
def getActionId(self):
"""Get a new Action ID.
@return: new ID.
"""
try:
self.actionId = self.actionId + 1
except AttributeError:
self.actionId = 1
return self.actionId
def getObjectId(self):
"""Get a new Object ID.
@requires: new ID.
"""
try:
self.objectId = self.objectId + 1
except AttributeError:
self.objectId = 1
return self.objectId
def getProcessId(self):
"""Get a new Process ID.
@requires: new ID.
"""
try:
self.processId = self.processId + 1
except AttributeError:
self.processId = 1
return self.processId
def getActImpId(self):
"""Get a new Action Implementation ID.
@requires: new ID.
"""
try:
self.actImpId = self.actImpId + 1
except AttributeError:
self.actImpId = 1
return self.actImpId
def getApiCallId(self):
"""Get a new API Call ID.
@requires: new ID.
"""
try:
self.apiCallId = self.apiCallId + 1
except AttributeError:
self.apiCallId = 1
return self.apiCallId
def addActions(self):
"""Adds actions section."""
# Processes
for process in self.results['behavior']['processes']:
self.createActionAPI(process)
# Network
if 'network' in self.results and isinstance(self.results['network'], dict):
if 'udp' in self.results['network'] and isinstance(self.results['network']['udp'], list):
for pkt in self.results['network']['udp']:
self.createActionNet(pkt)
if 'tcp' in self.results['network'] and isinstance(self.results['network']['tcp'], list):
for pkt in self.results['network']['tcp']:
self.createActionNet(pkt)
def createActionNet(self, packet):
"""Create a network action.
@return: action.
"""
act = maec.ActionType(
id = "%s:act:%s" % (self.idMap['prefix'], self.getActionId()),
)
act.set_Action_Initiator(maec.Action_InitiatorType(
type_ = 'Process',
Initiator_Object = maec.ObjectReferenceType(
type_ = 'Object',
object_id = self.idMap['subject']
)
)
)
ai = maec.ActionImplementationType(
type_ = 'Other',
id = "%s:imp:%s" % (self.idMap['prefix'], self.getActImpId()),
)
net = maec.Network_Action_AttributesType(
Internal_Port = packet['sport'],
External_Port = packet['dport'],
Internal_IP_Address = packet['src'],
External_IP_Address = packet['dst']
)
ai.set_Network_Action_Attributes(net)
act.set_Action_Implementation(ai)
self.actions.add_Action(act)
def createActionAPI(self, process):
"""Creates an action object which describes a process.
@param process: process from cuckoo dict.
"""
pid = self.getProcessId()
pos = 1
for call in process['calls']:
act = maec.ActionType(
id = "%s:act:%s" % (self.idMap['prefix'], self.getActionId()),
ordinal_position = pos,
timestamp = call['timestamp'],
successful = call['category']
)
try:
initiator = self.idMap[process['process_name']]
except KeyError:
initiator = self.idMap['subject']
act.set_Action_Initiator(maec.Action_InitiatorType(
type_ = 'Process',
Initiator_Object = maec.ObjectReferenceType(
type_ = 'Object',
object_id = initiator
)
)
)
ai = maec.ActionImplementationType(
type_ = 'API_Call',
id = "%s:imp:%s" % (self.idMap['prefix'], self.getActImpId()),
)
apicall = maec.APICallType(
id = "%s:api:%s" % (self.idMap['prefix'], self.getApiCallId()),
apifunction_name = call['api'],
ReturnValue = call['return']
)
apos = 1
for arg in call['arguments']:
apicall.add_APICall_Parameter(maec.APICall_ParameterType(
ordinal_position = apos,
Name = arg['name'],
Value = arg['value']
)
)
apos = apos + 1
ai.set_API_Call(apicall)
act.set_Action_Implementation(ai)
self.actions.add_Action(act)
pos = pos + 1
def createFileObj(self, file):
"""Creates a File object.
@param file: file dict from Cuckoo dict.
@requires: file object.
"""
obj = maec.ObjectType(
id = '%s:obj:%s' % (self.idMap['prefix'], self.getObjectId()),
object_name = file['name'],
type_ = "File"
)
self.idMap[file['name']] = obj.id
fs = maec.File_System_Object_AttributesType()
fs.set_File_Type(maec.File_TypeType(
type_ = file['type']
)
)
# Add static analysis if file obj is analysis subject.
if file['md5'] == self.results['file']['md5'] and len(self.results['static']) > 0:
pe = maec.PE_Binary_AttributesType(dll_count = self.results['static']['imported_dll_count'])
# PE exports
if len(self.results['static']['pe_exports']) > 0:
exports = maec.ExportsType()
pe.set_Exports(exports)
for x in self.results['static']['pe_exports']:
exp = maec.PEExportType(
Function_Name = x['name'],
Ordinal = x['ordinal'],
Entry_Point = x['address']
)
exports.add_Export(exp)
# PE Imports
if len(self.results['static']['pe_imports']) > 0:
imports = maec.ImportsType()
pe.set_Imports(imports)
for x in self.results['static']['pe_imports']:
imp = maec.PEImportType(
File_Name = x['dll']
)
# Imported functions
funcs = maec.Imported_FunctionsType()
imp.set_Imported_Functions(funcs)
for i in x['imports']:
f = maec.Imported_FunctionType(
Function_Name = i['name'],
Virtual_Address = i['address']
)
funcs.add_Imported_Function(f)
imports.add_Import(imp)
# Resources
if len(self.results['static']['pe_resources']) > 0:
resources = maec.ResourcesType()
pe.set_Resources(resources)
for r in self.results['static']['pe_resources']:
res = maec.PEResourceType(
Name = r['name']
)
resources.add_Resource(res)
# Sections
if len(self.results['static']['pe_sections']) > 0:
sections = maec.SectionsType()
pe.set_Sections(sections)
for s in self.results['static']['pe_sections']:
sec = maec.PESectionType(
Virtual_Size = int(s['virtual_size'], 16),
Virtual_Address = s['virtual_address'],
Entropy = s['entropy'],
Section_Name = s['name']
)
sections.add_Section(sec)
# Version info
if len(self.results['static']['pe_versioninfo']) > 0:
version = maec.Version_BlockType()
pe.set_Version_Block(version)
for k in self.results['static']['pe_versioninfo']:
if k['name'] == 'ProductVersion':
version.set_Product_Version_Text(k['value'])
if k['name'] == 'ProductName':
version.set_Product_Name(k['value'])
if k['name'] == 'FileVersion':
version.set_File_Version_Text(k['value'])
if k['name'] == 'CompanyName':
version.set_Company_Name(k['value'])
if k['name'] == 'OriginalFilename':
version.set_Original_File_Name(k['value'])
fs.set_File_Type_Attributes(maec.File_Type_AttributesType(pe))
h = maec.HashesType()
h.add_Hash(maec.HashType(
type_ = 'MD5',
Hash_Value = file['md5']
))
h.add_Hash(maec.HashType(
type_ = 'SHA1',
Hash_Value = file['sha1']
))
h.add_Hash(maec.HashType(
type_ = 'SHA256',
Hash_Value = file['sha256']
))
h.add_Hash(maec.HashType(
type_ = 'Other',
other_type = 'SHA512',
Hash_Value = file['sha512']
))
h.add_Hash(maec.HashType(
type_ = 'Other',
other_type = 'CRC32',
Hash_Value = file['crc32']
))
h.add_Hash(maec.HashType(
type_ = 'Other',
other_type = 'SSDEEP',
Hash_Value = file['ssdeep']
))
fs.set_Hashes(h)
obj.set_File_System_Object_Attributes(fs)
obj.set_Object_Size(maec.Object_SizeType(
units = 'Bytes',
valueOf_ = file['size']
))
return obj
def createSubject(self, file):
"""Create a subject entity.
@param file: file as in cuckoo dict.
@return: subject object.
"""
subject = maec.SubjectType()
subject.set_Object_Reference(maec.ObjectReferenceType(
type_ = 'Object',
object_id = self.idMap[file['name']]
)
)
self.idMap['subject'] = self.idMap[file['name']]
return subject
def createTools(self):
"""Creates a tools element.
@return: Tools object.
"""
tools = maec.Tools_UsedType()
tool = maec.ToolType(
id = "%s:tol:1" % self.idMap['prefix'],
Name = 'Cuckoo Sandbox',
Version = self.results['info']['version'],
Organization = 'http://www.cuckoosandbox.org'
)
tools.add_Tool(tool)
return tools
def addAnalysis(self):
"""Adds analysis header."""
analysis = maec.AnalysisType(
id = "%s:ana:1" % self.idMap['prefix'],
analysis_method = 'Dynamic',
start_datetime = datetime_to_iso(self.results["info"]["started"]),
complete_datetime = datetime_to_iso(self.results["info"]["ended"]),
lastupdate_datetime = datetime_to_iso(self.results["info"]["ended"])
)
# Add tool
analysis.set_Tools_Used(self.createTools())
# Add subject
analysis.add_Subject(self.createSubject(self.results['file']))
self.analyses.add_Analysis(analysis)
def addPools(self):
"""Adds Pools section."""
objs = self.results['dropped']
objs.append(self.results['file'])
pool = maec.Object_PoolType()
for file in objs:
pool.add_Object(self.createFileObj(file))
self.pools.set_Object_Pool(pool)
def output(self):
"""Writes report to disk."""
try:
report = open(os.path.join(self.reports_path, "report.maec-1.1.xml"), "w")
report.write('<?xml version="1.0" ?>\n')
report.write('<!--\n')
report.write('Cuckoo Sandbox MAEC 1.1 malware analysis report\n')
report.write('http://www.cuckoosandbox.org\n')
report.write('-->\n')
self.m.export(report, 0, namespace_ = '', name_ = 'MAEC_Bundle', namespacedef_ = 'xsi:schemaLocation="http://maec.mitre.org/XMLSchema/maec-core-1 file:MAEC_v1.1.xsd"')
report.close()
except (TypeError, IOError) as e:
raise CuckooReportError("Failed to generate MAEC 1.1 report: %s" % e)