gatherers/censys.py
import csv
import json
import logging
import os
from typing import List
from google.cloud import bigquery
from google.oauth2 import service_account
import google.api_core.exceptions
from gatherers.gathererabc import Gatherer
from utils import utils
# Options:
#
# --timeout: Override the 10 minute job timeout (specify in seconds).
# --cache: Use locally cached export data instead of hitting BigQuery.
# Gathers hostnames from Censys.io via the Google BigQuery API.
#
# Before using this, you need to:
#
# * create a Project in Google Cloud, and an associated service account
# with access to create new jobs/queries and get their results.
# * give Censys.io this Google Cloud service account to grant access to.
#
# For details on concepts, and how to test access in the web console:
#
# * https://support.censys.io/google-bigquery/bigquery-introduction
# * https://support.censys.io/google-bigquery/adding-censys-datasets-to-bigquery
#
# Note that the web console access is based on access given to a Google account,
# but BigQuery API access via this script depends on access given to
# Google Cloud *service account* credentials.
# Defaults to 10 minute timeout.
default_timeout = 60 * 60 * 10
class Gatherer(Gatherer):
def gather(self):
# Returns a parsed, processed Google service credentials object.
credentials = load_credentials()
if credentials is None:
logging.warning("No BigQuery credentials provided.")
logging.warning("Set BIGQUERY_CREDENTIALS or BIGQUERY_CREDENTIALS_PATH environment variables.")
exit(1)
# When using this form of instantiation, the client won't pull
# the project_id out of the creds, has to be set explicitly.
client = bigquery.Client(
project=credentials.project_id,
credentials=credentials
)
# Allow override of default timeout (in seconds).
timeout = int(self.options.get("timeout", default_timeout))
# Construct the query.
query = query_for(self.suffixes)
logging.debug("Censys query:\n%s\n" % query)
# Plan to store in cache/censys/export.csv.
download_path = utils.cache_path(
"export", "censys", ext="csv", cache_dir=self.cache_dir)
# Reuse of cached data can be turned on with --cache.
cache = self.options.get("cache", False)
if (cache is True) and os.path.exists(download_path):
logging.warning("Using cached download data.")
# But by default, fetch new data from the BigQuery API,
# and write it to the expected download location.
else:
# Ensure cache destination exists.
utils.mkdir_p(os.path.dirname(download_path))
logging.warning("Kicking off SQL query job.")
rows = None
# Actually execute the query.
try:
# Executes query and loads all results into memory.
query_job = client.query(query)
iterator = query_job.result(timeout=timeout)
rows = list(iterator)
except google.api_core.exceptions.Forbidden:
logging.warning("Access denied to Censys' BigQuery tables.")
except:
logging.warning(utils.format_last_exception())
logging.warning("Error talking to BigQuery, aborting.")
# At this point, the job is complete and we need to download
# the resulting CSV URL in results_url.
logging.warning("Caching results of SQL query.")
download_file = open(download_path, 'w', newline='')
download_writer = csv.writer(download_file)
download_writer.writerow(["Domain"]) # will be skipped on read
# Parse the rows and write them out as they were returned (dupes
# and all), to be de-duped by the central gathering script.
for row in rows:
domains = row['common_name'] + row['dns_names']
for domain in domains:
download_writer.writerow([domain])
# End CSV writing.
download_file.close()
# Whether we downloaded it fresh or not, read from the cached data.
for domain in utils.load_domains(download_path):
if domain:
yield domain
# Constructs the query to run in BigQuery, against Censys'
# certificate datasets, for one or more suffixes.
#
# Example query:
#
# SELECT
# parsed.subject.common_name,
# parsed.extensions.subject_alt_name.dns_names
# FROM
# `censys-io.certificates_public.certificates`,
# UNNEST(parsed.subject.common_name) AS common_names,
# UNNEST(parsed.extensions.subject_alt_name.dns_names) AS sans
# WHERE
# (common_names LIKE "%.gov"
# OR sans LIKE "%.gov")
# OR (common_names LIKE "%.fed.us"
# OR sans LIKE "%.fed.us");
def query_for(suffixes: List[str]) -> str:
select = "\n".join([
" parsed.subject.common_name,",
" parsed.extensions.subject_alt_name.dns_names",
])
from_clause = "\n".join([
" `censys-io.certificates_public.certificates`,",
" UNNEST(parsed.subject.common_name) AS common_names,",
" UNNEST(parsed.extensions.subject_alt_name.dns_names) AS sans",
])
# Returns query fragment for a specific suffix.
def suffix_query(suffix):
return "\n".join([
"(common_names LIKE \"%%%s\"" % suffix,
" OR sans LIKE \"%%%s\")" % suffix,
])
# Join the individual suffix clauses into one WHERE clause.
where = str.join("\n OR ", [suffix_query(suffix) for suffix in suffixes])
query = "\n".join([
"SELECT",
select,
"FROM",
from_clause,
"WHERE",
" %s" % where
])
return query
def get_credentials_from_env_var_or_file(env_var: str="",
env_file_var: str="") -> str:
creds = os.environ.get(env_var, None)
if creds is None:
path = os.environ.get(env_file_var, None)
if path is not None:
with open(path) as f:
creds = f.read()
return creds
# Load BigQuery credentials from either a JSON string, or
# a JSON file. Passed in via environment variables either way.
def load_credentials():
creds = get_credentials_from_env_var_or_file(
env_var="BIGQUERY_CREDENTIALS",
env_file_var="BIGQUERY_CREDENTIALS_PATH")
if creds is None:
return None
parsed = json.loads(creds)
return service_account.Credentials.from_service_account_info(parsed)