app/runner.py
#!/usr/bin/python
import base64
import donlib
import io
import cortexlib
import os
import sys
import threading
import time
from halocelery import tasks
from halocelery.apputils import Utility as util
from collections import deque
def main():
global slack_inbound
global slack_outbound
global health_string
global health_last_event_timestamp
global async_jobs
async_jobs = deque([])
health_last_event_timestamp = ""
health_string = ""
slack_inbound = deque([])
slack_outbound = deque([])
config = donlib.ConfigHelper()
"""First we make sure that all configs are sound..."""
check_configs(config)
"""Next, we start the Slack ingestion thread..."""
slack_consumer = threading.Thread(target=slack_in_manager, args=[config])
slack_consumer.daemon = True
slack_consumer.start()
"""Now, we start the Slack emitter..."""
slack_emitter = threading.Thread(target=slack_out_manager, args=[config])
slack_emitter.daemon = True
slack_emitter.start()
"""Next, our asynchronous job manager gets crunk"""
async_mgr = threading.Thread(target=async_manager, args=[config])
async_mgr.daemon = True
async_mgr.start()
"""Finally, we start up the Daemon Speaker"""
halo_enricher = threading.Thread(target=daemon_speaker, args=[config])
halo_enricher.daemon = True
halo_enricher.start()
msg = "Starting Don-Bot v%s\nName is set to %s" % (donlib.__version__,
config.slack_username)
util.log_stdout(msg)
msg = "Don-Bot sends general notifications to #%s" % config.slack_channel
util.log_stdout(msg)
if config.monitor_events == "yes":
util.log_stdout("Starting Halo event monitor")
halo_collector = threading.Thread(target=event_connector,
args=[config])
halo_collector.daemon = True
halo_collector.start()
while True:
s_consumer = (" Slack consumer alive: %s" %
str(slack_consumer.is_alive()))
s_emitter = (" Slack emitter alive: %s" %
str(slack_emitter.is_alive()))
h_enricher = (" Halo enricher alive: %s" %
str(halo_enricher.is_alive()))
a_manager = (" Async job manager alive: %s" %
str(async_mgr.is_alive()))
if config.monitor_events == "yes":
h_events = " Halo event monitor alive: %s\n Last event: %s" % (
halo_collector.is_alive(), health_last_event_timestamp)
else:
h_events = ""
health_string = "\n".join([s_consumer, s_emitter, h_enricher,
a_manager, h_events])
die_if_unhealthy(config.slack_channel)
time.sleep(30)
def die_if_unhealthy(slack_channel):
if "False" in health_string:
msg = health_string
msg += "\n\nDetected trouble in bot (see above). Bot app will restart."
channel = slack_channel
sad_note = (channel, msg)
slack_outbound.append(sad_note)
time.sleep(5)
sys.exit(2)
else:
pass
def event_connector(config):
global health_last_event_timestamp
halo = donlib.Halo(config, str(health_string), tasks)
events = donlib.HaloEvents(config)
quarantine = cortexlib.Quarantine(config)
ipblock = cortexlib.IpBlockCheck(config)
quarantine_check = False
ip_block_check = False
# We add a short delay in case of time drift between container and API
time.sleep(10)
while True:
for event in events:
quarantine_check = quarantine.should_quarantine(event)
ip_block_check = ipblock.should_block_ip(event)
health_last_event_timestamp = event["created_at"]
if not donlib.Utility.is_suppressed_event_type(config, event):
if donlib.Utility.event_is_critical(event):
util.log_stdout("EVENT_CONNECTOR: Critical event detected!") # NOQA
event_fmt = donlib.Formatter.format_item(event, "event")
slack_outbound.append((config.slack_channel, event_fmt))
if quarantine_check is not False:
async_jobs.append((config.slack_channel,
halo.quarantine_server(event)))
if ip_block_check is not False:
target_ip = ipblock.extract_ip_from_event(event)
target_zone_name = ipblock.ip_zone_name
async_jobs.append((config.slack_channel,
halo.add_ip_to_blocklist(target_ip,
target_zone_name)))
def daemon_speaker(config):
while True:
celery_tasks = tasks
halo = donlib.Halo(config, str(health_string), celery_tasks)
try:
message = slack_inbound.popleft()
channel = message["channel"]
halo_query, target = donlib.Lexicals.parse(message)
halo_results = halo.interrogate(halo_query, target)
util.log_stdout("DAEMON_SPEAKER: Results object type:%s" %
type(halo_results))
if isinstance(halo_results, (str, str)):
slack_outbound.append((channel, halo_results))
else:
util.log_stdout("DAEMON_SPEAKER: queueing up async job")
async_jobs.append((channel, halo_results))
except IndexError:
time.sleep(1)
return
def async_manager(config):
while True:
try:
job = async_jobs.popleft()
if job[1].ready():
if job[1].successful():
outbound_construct = (job[0], job[1].result)
slack_outbound.append(outbound_construct)
job[1].forget()
elif job[1].failed():
outbound_construct = (job[0], "REQUEST FAILED")
slack_outbound.append(outbound_construct)
job[1].forget()
else: # If not successful and not failed, throw it back.
async_jobs.append(job)
else:
async_jobs.append(job)
time.sleep(1)
except IndexError:
time.sleep(1)
def slack_in_manager(config):
slack = donlib.Slack(config)
for message in slack:
util.log_stdout("Message in slack consumer")
slack_inbound.append(message)
def slack_out_manager(config):
slack = donlib.Slack(config)
while True:
try:
message = slack_outbound.popleft()
try:
# Attempt to decode from Base64.
if "\n" in message[1]:
raise TypeError("Detected plaintext response...")
dec_msg = base64.decodestring(message[1])
util.log_stdout("Detected Base64-encoded file...")
slack.send_file(message[0], io.BytesIO(dec_msg).read(),
"Daemonic File")
except TypeError as e:
util.log_stdout(e)
slack.send_report(message[0], message[1],
"Daemonic Report")
except IndexError:
time.sleep(1)
def check_configs(config):
halo = donlib.Halo(config, "", "")
if halo.credentials_work() is False:
util.log_stdout("Halo credentials are bad! Exiting!")
sys.exit(1)
# If NOSLACK env var is set, don't go any further!
if os.getenv("NOSLACK"):
noslack_hold()
if config.sane() is False:
util.log_stdout("Configuration is bad! Exiting!")
sys.exit(1)
slack = donlib.Slack(config)
if slack.credentials_work() is False:
util.log_stdout("Slack credentials are bad! Exiting!")
sys.exit(1)
def noslack_hold():
msg = ("Slack integration is disabled. "
"Interact with Halo using:"
" 'docker exec -it cortex-bot python /app/interrogate.py'")
while True:
util.log_stdout(msg)
time.sleep(3600)
if __name__ == "__main__":
main()