websecmap/app/models.py
# coding=UTF-8
# from __future__ import unicode_literals
import importlib
import logging
import re
import celery
from django.contrib.auth.models import User
from django.db import models
from django.utils import timezone
from jsonfield import JSONField
from websecmap.celery import app
log = logging.getLogger(__name__)
def censor_sensitive_data(data):
# Heuristicly remove sensitive data from the task output. This will not cover all cases.
# This will prevent password leakage from database leaks.
# Full Match: pass='.....'
# Group 1: pass=
# Group 2: pass
# Group 3: " (' or ", enclosing)
# Replaces it to fieldname + quote + asteriks + quote
# https://regex101.com/
# @regex101: (note that \3 has to be written as \g3, thus ((pass|password|key|secret|hash|salt)=)(['"]).*?\g3
# The signifier for a secret can be anywhere in the variable name.
# https://stackoverflow.com/questions/10120295/valid-characters-in-a-python-class-name
data = re.sub(
r"(?i)([a-zA-Z0-9_]*?(sessionid|token|pass|password|key|secret|hash|salt)[a-zA-Z0-9_]*?=)(['\"]).*?\3",
r"\1\3********************\3",
data,
flags=re.MULTILINE,
)
return data
class Job(models.Model):
"""Wrap any Celery task to easily 'manage' it from Django."""
name = models.CharField(max_length=255, help_text="name of the job")
task = models.TextField(help_text="celery task signature in string form")
result_id = models.CharField(
unique=True, null=True, blank=True, max_length=255, help_text="celery asyncresult ID for tracing task"
)
status = models.CharField(max_length=255, help_text="status of the job")
result = JSONField(help_text="output of the task as JSON") # JSONfield (not django-jsonfield) does not
# encoder_class=ResultEncoder
created_on = models.DateTimeField(auto_now_add=True, blank=True, null=True, help_text="when task was created")
finished_on = models.DateTimeField(blank=True, null=True, help_text="when task ended")
# TypeError: __init__() missing 1 required positional argument: 'on_delete'
# probably because of blank and/or default.
created_by = models.ForeignKey(
User,
blank=True,
null=True,
on_delete=models.CASCADE,
)
def clean_task(self):
"""Truncate long task signatures as they result in mysql issues.
https://gitlab.com/failmap/server/issues/24
Task signatures are for informational purpose and not functionally required. Currently
there is no reason to keep large signatures so truncating to arbitrary limit of 1k.
"""
data = self.cleaned_data["task"][: 1000 * 1]
return censor_sensitive_data(data)
@classmethod
def create(cls, task: celery.Task, name: str, request, *args, **kwargs) -> "Job":
"""Create job object and publish task on celery queue."""
# create database object
job = cls(task=censor_sensitive_data(str(task)))
if request:
job.created_by = request.user
job.name = name[:255]
job.status = "created"
job.save()
# publish original task which stores the result in this Job object
result_id = (task | cls.store_result.s(job_id=job.id)).apply_async(*args, **kwargs)
# store the task async result ID for reference
job.result_id = result_id.id
job.save(update_fields=["result_id"])
return job
@staticmethod
@app.task(queue="storage")
def store_result(result, job_id=None):
"""Celery task to store result of wrapped task after it has completed."""
job = Job.objects.get(id=job_id)
if not result:
result = "-- task generated no result object --"
job.result = result
job.status = "completed"
job.finished_on = timezone.now()
try:
job.save(update_fields=["result", "status", "finished_on"])
except TypeError:
job.result = "Job returned a '%s' which could not be serialized. Job finished." % type(result)
job.save(update_fields=["result", "status", "finished_on"])
def __str__(self):
return self.name
@app.task(queue="storage")
def create_function_job(function: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
function: complete path to a function inside a module. This will be executed.
This function helps when not all tasks have been discovered or are called directly. It sets no requirement to how
a module should look. Anything that composes tasks can be inserted here.
"""
parts = function.split(".")
module = ".".join(parts[0:-1])
function_name = parts[-1]
module = importlib.import_module(module)
call = getattr(module, function_name, None)
if not call:
raise ValueError("Function %s not found in %s." % (function_name, module))
task = call(**kwargs)
job = Job.create(task, function, None)
return job.id
@app.task(queue="storage")
def create_job(task_module: str, **kwargs) -> int:
# will be deleted after all scanners have migrated to the manual / planned approach
module = importlib.import_module(task_module)
task = module.compose_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_discover_job(task_module: str, **kwargs) -> int:
# will be deleted after all scanners have migrated to the manual / planned approach
module = importlib.import_module(task_module)
task = module.compose_discover_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_verify_job(task_module: str, **kwargs) -> int:
# will be deleted after all scanners have migrated to the manual / planned approach
module = importlib.import_module(task_module)
task = module.compose_verify_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_scan_job(task_module: str, **kwargs) -> int:
# will be deleted after all scanners have migrated to the manual / planned approach
module = importlib.import_module(task_module)
task = module.compose_scan_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_planned_scan_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_planned_scan_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_planned_discover_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_planned_discover_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_planned_verify_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_planned_verify_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_manual_scan_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_manual_scan_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_manual_discover_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_manual_discover_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
@app.task(queue="storage")
def create_manual_verify_job(task_module: str, **kwargs) -> int:
"""Helper to allow Jobs to be created using Celery Beat.
task_module: module from which to call `compose_discover_task` which results in the task to be executed
"""
module = importlib.import_module(task_module)
task = module.compose_manual_verify_task(**kwargs)
job = Job.create(task, task_module, None)
return job.id
class Volunteer(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
organization = models.TextField(max_length=200, blank=True, null=True)
added_by = models.TextField(max_length=200, blank=True, null=True)
notes = models.TextField(max_length=2048, blank=True, null=True)
class GameUser(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
# store the password in plain_text, so it's recoverable.
password = models.TextField(max_length=200, blank=True, null=True)