dragonchain/dragonchain

View on GitHub
dragonchain/job_processor/job_processor.py

Summary

Maintainability
A
1 hr
Test Coverage
A
90%
# Copyright 2020 Dragonchain, Inc.
# Licensed under the Apache License, Version 2.0 (the "Apache License")
# with the following modification; you may not use this file except in
# compliance with the Apache License and the following modification to it:
# Section 6. Trademarks. is deleted and replaced with:
#      6. Trademarks. This License does not grant permission to use the trade
#         names, trademarks, service marks, or product names of the Licensor
#         and its affiliates, except as required to comply with Section 4(c) of
#         the License and to reproduce the content of the NOTICE file.
# You may obtain a copy of the Apache License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License with the above modification is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the Apache License for the specific
# language governing permissions and limitations under the Apache License.

import json
import os
import time
from typing import Optional, Dict, cast

import fastjsonschema
import kubernetes

from dragonchain.lib import error_reporter
from dragonchain.lib.dto import schema
from dragonchain.lib.database import redis
from dragonchain import logger

DRAGONCHAIN_VERSION = os.environ["DRAGONCHAIN_VERSION"]
INTERNAL_ID = os.environ["INTERNAL_ID"]
STAGE = os.environ["STAGE"]
REGISTRY = os.environ["REGISTRY"]
IAM_ROLE = os.environ.get("IAM_ROLE")
NAMESPACE = os.environ["NAMESPACE"]
DEPLOYMENT_NAME = os.environ["DEPLOYMENT_NAME"]
STORAGE_TYPE = os.environ["STORAGE_TYPE"]
STORAGE_LOCATION = os.environ["STORAGE_LOCATION"]
SECRET_LOCATION = os.environ["SECRET_LOCATION"]
DRAGONCHAIN_IMAGE = os.environ["DRAGONCHAIN_IMAGE"]

CONTRACT_TASK_KEY = "mq:contract-task"
PENDING_TASK_KEY = "mq:contract-pending"

_log = logger.get_logger()
_kube: kubernetes.client.BatchV1Api = cast(kubernetes.client.BatchV1Api, None)  # This will always be defined before starting by being set in start()
_validate_sc_build_task = fastjsonschema.compile(schema.smart_contract_build_task_schema)


def start() -> None:
    """Start the next job in the queue"""
    _log.debug("Connecting to service account")
    kubernetes.config.load_incluster_config()

    _log.debug("Creating kubernetes client")
    global _kube
    _kube = kubernetes.client.BatchV1Api()

    _log.debug("Job processor ready!")

    if redis.llen_sync(PENDING_TASK_KEY):
        _log.warning("WARNING! Pending job processor queue was not empty. Last job probably crashed. Re-queueing these dropped items.")
        to_recover = redis.lrange_sync(PENDING_TASK_KEY, 0, -1, decode=False)
        p = redis.pipeline_sync()
        p.rpush(CONTRACT_TASK_KEY, *to_recover)
        p.delete(PENDING_TASK_KEY)
        p.execute()
    while True:
        start_task()


def get_job_name(contract_id: str) -> str:
    """Get the name of a kubernetes contract job
    Args:
        contract_id: Id of the contract
    Return:
        A string of the given contract's name.
    """
    return f"contract-{contract_id}"


def get_job_labels(event: dict) -> Dict[str, str]:
    """Get kubernetes labels of the given job
    Args:
        event: An invocation of a smart contract job
    Returns:
        An dictionary with kubernetes job labels for a smart contract
    """
    return {
        "app.kubernetes.io/name": get_job_name(event["id"]),
        "app.kubernetes.io/component": f"{DEPLOYMENT_NAME}-job",
        "app.kubernetes.io/version": DRAGONCHAIN_VERSION,
        "app.kubernetes.io/instance": event["id"],
        "app.kubernetes.io/part-of": DEPLOYMENT_NAME,
        "dragonchainId": INTERNAL_ID,
        "stage": STAGE,
    }


def start_task() -> None:
    task_definition = get_next_task()
    if task_definition:
        job = get_existing_job_status(task_definition)
        if job and job.status.active:
            _log.warning("Throwing away task because job already exists")
            redis.lpop_sync(PENDING_TASK_KEY)
            return
        if job and (job.status.succeeded or job.status.failed):
            delete_existing_job(task_definition)
        attempt_job_launch(task_definition)
        redis.lpop_sync(PENDING_TASK_KEY)


def get_next_task() -> Optional[dict]:
    """Pop the next task off of the job queue
    Returns:
        The next task. Blocks until a job is found.
    """
    _log.info("Awaiting contract task...")
    pop_result = redis.brpoplpush_sync(CONTRACT_TASK_KEY, PENDING_TASK_KEY, 0, decode=False)
    if pop_result is None:
        return None
    _log.debug(f"received task: {pop_result}")
    try:
        event = json.loads(pop_result)
        _validate_sc_build_task(event)
    except Exception:
        _log.exception("Error processing task, skipping")
        redis.lpop_sync(PENDING_TASK_KEY)
        return None
    _log.info(f"New task request received: {event}")
    return event


def get_existing_job_status(task: dict) -> Optional[kubernetes.client.V1Job]:
    try:
        return _kube.read_namespaced_job_status(f"contract-{task['id']}", NAMESPACE)
    except kubernetes.client.rest.ApiException as e:
        if e.status == 404:
            _log.info("No existing job found")
            return None
        _log.info("Kubernetes API error")
        raise RuntimeError("Could not get existing job status")


def delete_existing_job(task: dict) -> Optional[kubernetes.client.V1Status]:
    try:
        _log.info("Deleting existing job")
        return _kube.delete_namespaced_job(
            f"contract-{task['id']}", NAMESPACE, body=kubernetes.client.V1DeleteOptions(propagation_policy="Background")
        )
    except kubernetes.client.rest.ApiException as e:
        if e.status == 404:
            _log.info("No existing job found")
            return None
        raise RuntimeError("Could not delete existing job")


def attempt_job_launch(event: dict, retry: int = 0) -> None:
    """Launch kubernetes namespaced job given a smart contract invocation
    Args:
        event: An invocation of a smart contract
        retry: The retry count for recursive invocation (dont specify manually)
    """
    if retry > 5:
        # Re-enqueue?
        _log.error("Could not launch job after 5 attempts.")
        raise RuntimeError("Failure to launch job after 5 attempts")

    _log.info("Launching kubernetes job")
    try:
        volume_mounts = [
            kubernetes.client.V1VolumeMount(name="dockersock", mount_path="/var/run/docker.sock"),
            kubernetes.client.V1VolumeMount(name="faas", mount_path="/etc/openfaas-secret", read_only=True),
            kubernetes.client.V1VolumeMount(name="secrets", mount_path=SECRET_LOCATION[: SECRET_LOCATION.rfind("/")], read_only=True),
        ]
        volumes = [
            kubernetes.client.V1Volume(name="dockersock", host_path=kubernetes.client.V1HostPathVolumeSource(path="/var/run/docker.sock")),
            kubernetes.client.V1Volume(name="faas", secret=kubernetes.client.V1SecretVolumeSource(secret_name="openfaas-auth")),  # nosec
            kubernetes.client.V1Volume(name="secrets", secret=kubernetes.client.V1SecretVolumeSource(secret_name=f"d-{INTERNAL_ID}-secrets")),
        ]
        if STORAGE_TYPE == "disk":
            volume_mounts.append(kubernetes.client.V1VolumeMount(name="main-storage", mount_path=STORAGE_LOCATION))
            volumes.append(
                kubernetes.client.V1Volume(
                    name="main-storage",
                    persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(claim_name=f"{DEPLOYMENT_NAME}-main-storage"),
                )
            )
        annotations = {}
        if IAM_ROLE:
            annotations["iam.amazonaws.com/role"] = IAM_ROLE

        resp = _kube.create_namespaced_job(
            namespace=NAMESPACE,
            body=kubernetes.client.V1Job(
                metadata=kubernetes.client.V1ObjectMeta(name=get_job_name(event["id"]), labels=get_job_labels(event)),
                spec=kubernetes.client.V1JobSpec(
                    completions=1,
                    parallelism=1,
                    backoff_limit=1,  # This is not respected in k8s v1.11 (https://github.com/kubernetes/kubernetes/issues/54870)
                    active_deadline_seconds=600,
                    template=kubernetes.client.V1PodTemplateSpec(
                        metadata=kubernetes.client.V1ObjectMeta(annotations=annotations, labels=get_job_labels(event)),
                        spec=kubernetes.client.V1PodSpec(
                            containers=[
                                kubernetes.client.V1Container(
                                    name=get_job_name(event["id"]),
                                    image=DRAGONCHAIN_IMAGE,
                                    security_context=kubernetes.client.V1SecurityContext(privileged=True),
                                    volume_mounts=volume_mounts,
                                    command=["sh"],
                                    args=["entrypoints/contract_job.sh"],
                                    env=[
                                        kubernetes.client.V1EnvVar(name="EVENT", value=json.dumps(event, separators=(",", ":"))),
                                        kubernetes.client.V1EnvVar(name="SERVICE", value="contract-job"),
                                    ],
                                    env_from=[
                                        kubernetes.client.V1EnvFromSource(
                                            config_map_ref=kubernetes.client.V1ConfigMapEnvSource(name=f"{DEPLOYMENT_NAME}-configmap")
                                        )
                                    ],
                                )
                            ],
                            volumes=volumes,
                            restart_policy="Never",
                        ),
                    ),
                ),
            ),
        )
        _log.info(f"Response from API: {resp}")
    except kubernetes.client.rest.ApiException as e:
        _log.exception(f"Error thrown while starting job. status: {e.status}")
        if e.status == 409:
            retry += 1
            _log.warning(f"Failed to launch! Retry attempt ({retry}/5) in 5 seconds")
            time.sleep(5)
            attempt_job_launch(event, retry=retry)
        raise RuntimeError(f"Error thrown while starting job. status: {e.status}")
    except Exception:
        raise RuntimeError("Unexpected error")


if __name__ == "__main__":
    try:
        start()
    except Exception as e:
        error_reporter.report_exception(e, "Job poller failure.")
        raise