This file is part of Polichombr.
(c) 2017 ANSSI-FR
Managers for editing and running yara rules
import re
import yara
from sqlalchemy import or_
from polichombr import app, db
from polichombr.models.yara_rule import YaraRule
from polichombr.models.sample import Sample
from polichombr.models.models import TLPLevel
from polichombr.models.sample import FunctionInfo
from polichombr.controllers.jobpool import YaraJobPool
from polichombr.controllers.family import FamilyController
def run_simple_yara(raw_rule, sample):
Compile and run a yara rule on a given sample
matches = {}
if sample.storage_file is None or sample.storage_file == "":
return False
sample_filepath = sample.storage_file
yara_obj = yara.compile(source=raw_rule)
matches = yara_obj.match(data=open(sample_filepath, "rb").read())
except Exception as e:
app.logger.exception("YARA RULE FAILED: %s" % (e))
if matches:
return True
return False
def filter_funcs_infos(sid, hashes):
Return a query wich filters the func infos
from a sample with given machoc hashes
filters = [int(m_hash[8:], 16) for m_hash in hashes]
filters = FunctionInfo.machoc_hash.in_(filters)
funcs = FunctionInfo.query.filter_by(sample_id=sid)
funcs = funcs.filter(or_(filters))
return funcs
def search_by_yara_regexp(raw_rule, sid, pattern):
Search for a pattern in the extended yara
and return corresponding functions
matches = re.findall(pattern, raw_rule)
if pattern:
funcs = filter_funcs_infos(sid, matches)
return funcs
def run_extended_yara(raw_rule, sample):
Runs an extended Yara rule on a sample. It will just
first parse the "machoc" keywords to pre-select matching
samples, and then apply the yara rules.
Machoc hashes must have been set in the sample, obviously.
prematchs = search_by_yara_regexp(raw_rule,
if prematchs.count() == 0:
return False
noprematchs = search_by_yara_regexp(raw_rule,
if noprematchs.count() > 0:
return False
return run_simple_yara(raw_rule, sample)
class YaraSingleTask(object):
Yara task. Used in the yara job pool. We use this task to
define a yara rule which must be ran on a sample.
def __init__(self, sample, yar):
self.yara_id = yar.id
self.sample_id = sample.id
self.matched = False
def execute(self):
Get the objects, and run the extended yara.
with app.app_context():
yar = YaraRule.query.get(self.yara_id)
sample = Sample.query.get(self.sample_id)
if yar in sample.yaras:
return True
if run_extended_yara(yar.raw_rule, sample) is True:
self.matched = True
return True
def apply_result(self):
Commit the match in the sample.
if self.matched is False:
return True
yar = YaraRule.query.get(self.yara_id)
sample = Sample.query.get(self.sample_id)
if yar in sample.yaras:
return True
for family in yar.families:
if family not in sample.families:
return True
class YaraController(object):
Yara object controller.
jobpool = None
def __init__(self):
Inits also the jobpool. Take care of it as it may spawn new processes.
self.jobpool = YaraJobPool()
def get_all():
Get all yara rules.
return YaraRule.query.all()
def execute_on_sample(self, sample, yar):
Execute rule on sample => add to the jobpool.
y_task = YaraSingleTask(sample, yar)
def create(self, name, raw_data, tlp_level):
Creates a new rule.
Checks the rule before insertion, and executes it on
any database sample.
if TLPLevel.tostring(tlp_level) is None:
return False
if YaraRule.query.filter_by(name=name).count() != 0:
app.logger.error("This rule already exists")
return False
except yara.SyntaxError as error:
app.logger.error("Failed to compile rule %s", name)
return False
yar = YaraRule(name, raw_data, tlp_level)
yar.version = 1
for sample in Sample.query.all():
self.execute_on_sample(sample, yar)
return yar
def get_by_id(yara_id):
Get yara rule by its ID.
return YaraRule.query.get(yara_id)
def get_by_name(name):
Get yara rule by its name.
yar = YaraRule.query.filter_by(name=name)
if yar is None:
return None
return yar.first()
def add_to_item(item, yar):
Add a yara to a family or a sample
if yar in item.yaras:
return True
return True
def add_to_sample(cls, sample, yar):
Adds yara to a sample. Checks before add. Commits also the yara's
attached families.
cls.add_to_item(sample, yar)
cls.propagate_family(sample, yar)
return True
def propagate_family(sample, yar):
Dispatch families from a rule to a sample
for fam in yar.families:
if fam not in sample.families:
FamilyController.add_sample(sample, fam)
def remove_from_family(fam, yar):
Removes yara to family.
if yar in fam.yaras:
return True
def add_to_family(cls, fam, yar):
Adds a yara rule to a family. Also adds the samples. Sensibility
is NOT propagated as a generic yara rule may be used to identify
multiple families.
cls.add_to_item(fam, yar)
for sample in yar.samples:
cls.propagate_family(sample, yar)
return True
def delete(yar):
Removes yara from database.
def rename(new_name, yar):
Change yara name.
yar.name = new_name
return True
def update(rule, new_rule):
Update a rule with a new text.
Also increments the version number
rule.raw_rule = new_rule
rule.version += 1
return True
def set_tlp_level(tlp_level, yar):
Change TLP level.
if TLPLevel.tostring(tlp_level) is None:
return False
yar.TLP_sensibility = tlp_level
return True