data/update.py
##
# This file must be run as a module in order for it to access
# modules in sibling directories.
#
# Run with:
# python -m data.update
import subprocess
import datetime
import os
import ujson
import logging
# Import all the constants from data/env.py.
from data.env import *
# Import processing just for the function call.
import data.processing
from data import logger
LOGGER = logger.get_logger(__name__)
# Orchestrate the overall regular Pulse update process.
#
# Steps:
#
# 1. Kick off domain-scan to scan each domain for each measured thing.
# - Should drop results into data/output/parents (or a symlink).
# - If exits with non-0 code, this should exit with non-0 code.
#
# 1a. Subdomains.
# - Gather latest subdomains from public sources, into one condensed deduped file.
# - Run pshtt and sslyze on gathered subdomains.
# - This creates 2 resulting CSVs: pshtt.csv and sslyze.csv
#
# 2. Run processing.py to generate front-end-ready data as data/db.json.
#
# 3. Upload data to S3.
# - Depends on the AWS CLI and access credentials already being configured.
# - TODO: Consider moving from aws CLI to Python library.
# Options:
# --date: override date, defaults to contents of meta.json
# --scan=[skip,download,here]
# skip: skip all scanning, assume CSVs are locally cached
# download: download scan data from S3
# here: run the default full scan
# --upload: upload scan data and resulting db.json anything to S3
# --gather=[skip,here]
# skip: skip gathering, assume CSVs are locally cached
# here: run the default full gather
def run(options):
# If this is just being used to download production data, do that.
if options.get("just-download", False):
download_s3()
return
# Definitive scan date for the run.
today = datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d")
# 1. Download scan data, do a new scan, or skip altogether.
scan_mode = options.get("scan", "skip")
# Whether to gather domains (defaults to doing so).
gather_mode = options.get("gather", "here")
if scan_mode == "here":
# 1a. Gather .gov federal subdomains.
if gather_mode == "here":
LOGGER.info("Gathering subdomains.")
gather_subdomains(options)
LOGGER.info("Subdomain gathering complete.")
elif gather_mode == "skip":
LOGGER.info("Skipping subdomain gathering.")
# 1b. Scan subdomains for some types of things.
LOGGER.info("Scanning subdomains.")
scan_subdomains(options)
LOGGER.info("Subdomain scanning complete")
# 1c. Scan parent domains for all types of things.
LOGGER.info("Scanning parent domains.")
scan_parents(options)
LOGGER.info("Scan of parent domains complete.")
elif scan_mode == "download":
LOGGER.info("Downloading latest production scan data from S3.")
download_s3()
LOGGER.info("Download complete.")
# Sanity check to make sure we have what we need.
if not os.path.exists(os.path.join(PARENTS_RESULTS, "meta.json")):
LOGGER.info("No scan metadata downloaded, aborting.")
exit()
# Date can be overridden if need be, but defaults to meta.json.
if options.get("date", None) is not None:
the_date = options.get("date")
else:
# depends on YYYY-MM-DD coming first in meta.json time format
scan_meta = ujson.load(open("data/output/parents/results/meta.json"))
the_date = scan_meta['start_time'][0:10]
# 2. Process and load data into Pulse's database.
LOGGER.info("[%s] Loading data into Pulse." % the_date)
data.processing.run(the_date, options)
LOGGER.info("[%s] Data now loaded into Pulse." % the_date)
# 3. Upload data to S3 (if requested).
if options.get("upload", False):
LOGGER.info("[%s] Syncing scan data and database to S3." % the_date)
upload_s3(the_date)
LOGGER.info("[%s] Scan data and database now in S3." % the_date)
LOGGER.info("[%s] All done." % the_date)
# Upload the scan + processed data to /live/ and /archive/ locations by date.
def upload_s3(date):
# Used for all operations.
acl = "--acl=public-read"
# Used when uploading to the live/ dir.
delete = "--delete"
live_parents = "s3://%s/live/parents/" % BUCKET_NAME
live_subdomains = "s3://%s/live/subdomains/" % BUCKET_NAME
live_db = "s3://%s/live/db/" % BUCKET_NAME
shell_out(["aws", "s3", "sync", PARENTS_DATA, live_parents, acl, delete])
shell_out(["aws", "s3", "sync", SUBDOMAIN_DATA, live_subdomains, acl, delete])
shell_out(["aws", "s3", "cp", DB_DATA, live_db, acl])
# Then copy the entire live directory to a dated archive.
# Ask S3 to do the copying, to save on time and bandwidth.
live = "s3://%s/live/" % (BUCKET_NAME)
archive = "s3://%s/archive/%s/" % (BUCKET_NAME, date)
shell_out(["aws", "s3", "sync", live, archive, acl])
# Makes use of the public URLs so that this can be run in a dev
# environment that doesn't have write credentials to the bucket.
def download_s3():
def download(src, dest):
# remote sources relative to bucket
url = "https://s3-%s.amazonaws.com/%s/%s" % (AWS_REGION, BUCKET_NAME, src)
# local destinations are relative to data/
path = os.path.join(DATA_DIR, dest)
shell_out(["curl", url, "--output", path])
# Ensure the destination directories are present.
os.makedirs(os.path.join(PARENTS_DATA, "results"), exist_ok=True)
os.makedirs(os.path.join(SUBDOMAIN_DATA_GATHERED, "results"), exist_ok=True)
os.makedirs(os.path.join(SUBDOMAIN_DATA_SCANNED, "results"), exist_ok=True)
# Use cURL to download files.
# Don't rely on aws being configured, and surface any permission issues.
#
# Just grab results, not all the cached data.
# Results of parent domain scanning.
for scanner in SCANNERS:
download("live/parents/results/%s.csv" % scanner, "output/parents/results/%s.csv" % scanner)
download("live/parents/results/meta.json", "output/parents/results/meta.json")
# Results of subdomain scanning.
for scanner in SUBDOMAIN_SCANNERS:
download("live/subdomains/scan/results/%s.csv" % scanner, "output/subdomains/scan/results/%s.csv" % scanner)
download("live/subdomains/scan/results/meta.json", "output/subdomains/scan/results/meta.json")
# Results of subdomain gathering.
download("live/subdomains/gather/results/gathered.csv", "output/subdomains/gather/results/gathered.csv")
download("live/subdomains/gather/results/meta.json", "output/subdomains/gather/results/meta.json")
# Also download the latest compiled DB.
# This will be overwritten immediately if this is being used in
# the context of a full data load/processing.
download("live/db/db.json", "db.json")
# Use domain-scan to scan .gov domains from the set domain URL.
# Drop the output into data/output/parents/results.
def scan_parents(options):
scanners = "--scan=%s" % (str.join(",", SCANNERS))
analytics = "--analytics=%s" % ANALYTICS_URL
output = "--output=%s" % PARENTS_DATA
a11y_redirects = "--a11y-redirects=%s" % A11Y_REDIRECTS
a11y_config = "--a11y-config=%s" % A11Y_CONFIG
full_command =[
SCAN_COMMAND, DOMAINS,
scanners,
analytics, a11y_config, a11y_redirects,
output,
# "--debug", # always capture full output
"--sort",
"--meta"
]
# Allow some options passed to python -m data.update to go
# through to domain-scan.
# Boolean flags.
for flag in ["cache", "serial", "lambda"]:
value = options.get(flag)
if value:
full_command += ["--%s" % flag]
# Flags with values.
for flag in ["lambda-profile"]:
value = options.get(flag)
if value:
full_command += ["--%s=%s" % (flag, str(value))]
# Until third_parties and a11y are moved to Lambda, can't
# do Lambda-sized worker count. Stick with default (10).
# if options.get("lambda"):
# full_command += ["--workers=%i" % LAMBDA_WORKERS]
shell_out(full_command)
# Use domain-scan to gather .gov domains from public sources.
def gather_subdomains(options):
LOGGER.info("[gather] Gathering subdomains.")
full_command = [GATHER_COMMAND]
full_command += [",".join(GATHERER_NAMES)]
full_command += GATHERER_OPTIONS
# Common to all gatherers.
# --parents gets auto-included as its own gatherer source.
full_command += [
"--output=%s" % SUBDOMAIN_DATA_GATHERED,
"--suffix=%s" % GATHER_SUFFIXES,
"--parents=%s" % DOMAINS,
"--ignore-www",
"--sort",
"--debug" # always capture full output
]
# Allow some options passed to python -m data.update to go
# through to domain-scan.
for flag in ["cache"]:
if options.get(flag):
full_command += ["--%s" % flag]
shell_out(full_command)
# Run pshtt on each gathered set of subdomains.
def scan_subdomains(options):
LOGGER.info("[scan] Scanning subdomains.")
subdomains = os.path.join(SUBDOMAIN_DATA_GATHERED, "results", "gathered.csv")
full_command = [
SCAN_COMMAND,
subdomains,
"--scan=%s" % str.join(",", SUBDOMAIN_SCANNERS),
"--output=%s" % SUBDOMAIN_DATA_SCANNED,
# "--debug", # always capture full output
"--sort",
"--meta"
]
# Allow some options passed to python -m data.update to go
# through to domain-scan.
# Boolean flags.
for flag in ["cache", "serial", "lambda"]:
value = options.get(flag)
if value:
full_command += ["--%s" % flag]
# Flags with values.
for flag in ["lambda-profile"]:
value = options.get(flag)
if value:
full_command += ["--%s=%s" % (flag, str(value))]
# If Lambda mode is on, use way more workers.
if options.get("lambda") and (options.get("serial", None) is None):
full_command += ["--workers=%i" % LAMBDA_WORKERS]
shell_out(full_command)
## Utils function for shelling out.
def shell_out(command, env=None):
try:
LOGGER.info("[cmd] %s" % str.join(" ", command))
response = subprocess.check_output(command, shell=False, env=env)
output = str(response, encoding='UTF-8')
LOGGER.info(output)
return output
except subprocess.CalledProcessError:
LOGGER.critical("Error running %s." % (str(command)))
exit(1)
return None
### Run when executed.
if __name__ == '__main__':
run(options())