HumanCellAtlas/metadata-schema-publisher

View on GitHub
handler.py

Summary

Maintainability
C
7 hrs
Test Coverage
import base64
import json
import os

import boto3
import requests
from botocore.exceptions import ClientError
from github import Github, GithubException

from release_prep import ReleasePreparation

BRANCH_REFS = ['refs/heads/master', 'refs/heads/staging', 'refs/heads/integration', 'refs/heads/develop']

BRANCH_CONFIG = {
    'develop': 'DEV_BUCKET',
    'integration': 'INTEGRATION_BUCKET',
    'staging': 'STAGING_BUCKET',
    'master': 'PROD_BUCKET'
}

INGEST_API = {
    'develop': 'https://api.ingest.dev.data.humancellatlas.org',
    'integration': 'https://api.ingest.integration.data.humancellatlas.org',
    'staging': 'https://api.ingest.staging.data.humancellatlas.org',
    'master': 'https://api.ingest.data.humancellatlas.org'
}

SCHEMA_URL = {
    'master': 'https://schema.humancellatlas.org/',
    'develop': 'https://schema.dev.data.humancellatlas.org/',
    'integration': 'https://schema.integration.data.humancellatlas.org/',
    'staging': 'https://schema.staging.data.humancellatlas.org/'

}

UNVERSIONED_FILES = [
    'property_migrations'
]


def _notify_ingest(branch_name):
    ingest_base_url = INGEST_API.get(branch_name)
    schema_update_url = f'{ingest_base_url}/schemas/update'
    r = requests.post(schema_update_url)
    r.raise_for_status()
    print('Notified Ingest!')


def get_access_token(secrets):
    access_token = secrets.get('GITHUB_ACCESS_TOKEN')
    if not access_token:
        raise Exception('A GitHub access token is required to communicate with GitHub API')
    return access_token


def on_github_push(event, context, dryrun=False):
    message = _process_event(event)
    ref = message["ref"]
    secret_name = os.environ['SECRET_NAME']
    secrets = json.loads(get_secret(secret_name))
    access_token = get_access_token(secrets)

    if ref in BRANCH_REFS:
        repo_name = message["repository"]["full_name"]
        repo = Github(access_token).get_repo(repo_name)
        branch = repo.get_branch(ref)
        pusher = message["pusher"]["name"]
        notification_message = "Commit to " + ref + " detected on " + repo_name + " branch " + branch.name + " by " + \
                               pusher
        print(notification_message)
        _send_notification(notification_message, context, dryrun)
        server_path = 'json_schema'
        versions_file = repo.get_contents(server_path + "/versions.json", branch.name)
        version_numbers_str = base64.b64decode(versions_file.content).decode("utf-8")
        version_numbers = json.loads(version_numbers_str)

        result = _process_directory(repo, branch.name, server_path, server_path, version_numbers, context, dryrun)
        result_str = "\n".join(result)
        result_message = ""

        if len(result) == 0:
            result_message = result_message + "No schema changes published"
        else:
            result_message = result_message + "New schema changes published:\n" + result_str
            _notify_ingest(branch.name)
        print(result_message)
        _send_notification(result_message, context, dryrun)

    else:
        result = []

    response = {
        "statusCode": 200,
        "body": {
            "created": json.dumps(result)
        }
    }
    return response


def _process_event(event):
    message = json.loads(event["body"])
    return message


def _process_directory(repo, branch_name, base_server_path, server_path, version_numbers, context, dryrun=False):
    print("Processing " + server_path + " in " + branch_name + " branch of " + repo.name)
    created_list = []
    contents = repo.get_dir_contents(server_path, branch_name)
    for content in contents:
        if content.type == 'dir':
            created_list.extend(
                _process_directory(repo, branch_name, base_server_path, content.path, version_numbers, context, dryrun))
        else:
            try:
                path = content.path
                file_root, file_extension = os.path.splitext(path)
                if file_extension == '.json' and not path.endswith('versions.json'):
                    print("- processing: " + path)
                    file_content = repo.get_contents(path, branch_name)
                    data = base64.b64decode(file_content.content)
                    json_data = json.loads(data.decode('utf8'))
                    relative_path = path.replace(base_server_path + "/", "")
                    relative_path = relative_path.replace(".json", "")
                    key = None

                    if relative_path in UNVERSIONED_FILES:
                        expanded_file_data = json_data
                        key = relative_path
                    else:
                        schema_url = SCHEMA_URL.get(branch_name)
                        release_preparation = ReleasePreparation(schema_url=schema_url, version_map=version_numbers)
                        expanded_file_data = release_preparation.expand_urls(relative_path, json_data)
                        key = release_preparation.get_schema_key(expanded_file_data)

                    if key is None:
                        print("- could not find key for: " + path)
                    else:
                        created = _upload(key, branch_name, expanded_file_data, context, dryrun)
                        if created:
                            created_list.append(key)
                else:
                    print("- skipping: " + path)
            except(GithubException, IOError) as e:
                print('Error processing %s: %s', content.path, e)
    return created_list


def _upload(key, branch_name, file_data, context, dryrun=False):
    if dryrun:
        output_dir = 'dryrun'
        output_path = output_dir + '/' + key
        pos = output_path.rfind('/')
        os.makedirs(output_path[0:pos], exist_ok=True)
        with open(output_path, 'w') as outfile:
            json.dump(file_data, outfile, indent=2)
        print("Output: " + output_path)
        return True
    else:
        bucket_env_var = BRANCH_CONFIG.get(branch_name)

        bucket = os.environ[bucket_env_var]

        s3 = boto3.client('s3')

        if (not _key_exists(s3, bucket, key)) or (key in UNVERSIONED_FILES):
            try:
                s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(file_data, indent=2),
                              ContentType='application/json', ACL='public-read', CacheControl="no-cache")
                return True
            except Exception as e:
                error_message = 'Error uploading ' + key
                print(error_message, e)
                _send_notification(error_message, context, dryrun)
        else:
            return False


def _key_exists(s3, bucket, key):
    response = s3.list_objects_v2(
        Bucket=bucket,
        Prefix=key,
    )
    for obj in response.get('Contents', []):
        if obj['Key'] == key:
            return obj['Size']


def _send_notification(message, context, dryrun=False):
    if dryrun:
        print("DRY RUN:" + message)
    else:
        topic_name = os.environ['TOPIC_NAME']
        account_id = context.invoked_function_arn.split(":")[4]
        if account_id != "Fake":
            print("Sending notification to " + topic_name)
            topic_arn = "arn:aws:sns:" + os.environ['AWS_REGION'] + ":" + account_id + ":" + topic_name
            sns = boto3.client(service_name="sns")
            sns.publish(
                TopicArn=topic_arn,
                Message=message
            )
        else:
            print("Skipping notification: " + message)


def sns_to_slack(event, context):
    print(event)
    sns = event['Records'][0]['Sns']
    message = sns['Message']

    secret_name = os.environ['SECRET_NAME']
    secret = get_secret(secret_name)
    secrets = json.loads(secret)

    webhook_url = secrets.get('SLACK_URL')

    if not webhook_url:
        raise Exception('Could not find the slack webhook url!')

    payload = {
        'text': message
    }
    r = requests.post(webhook_url, json=payload)
    return r.status_code


def get_secret(secret_name, region_name=None):
    if not region_name:
        region_name = os.environ['AWS_PROVIDER_REGION']

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    get_secret_value_response = client.get_secret_value(
        SecretId=secret_name
    )

    # Decrypts secret using the associated KMS CMK.
    # Depending on whether the secret is a string or binary, one of these fields will be populated.
    if 'SecretString' in get_secret_value_response:
        return get_secret_value_response['SecretString']
    else:
        return base64.b64decode(get_secret_value_response['SecretBinary'])