intraxia/wp-gistpen

View on GitHub
app/Jobs/AbstractJob.php

Summary

Maintainability
C
1 day
Test Coverage
<?php

namespace Intraxia\Gistpen\Jobs;

use Intraxia\Gistpen\Contract\Job;
use Intraxia\Gistpen\Model\Message;
use Intraxia\Gistpen\Model\Run;
use Intraxia\Jaxion\Axolotl\Collection;
use Intraxia\Jaxion\Contract\Axolotl\EntityManager;
use WP_Error;

/**
 * Base Job class.
 */
abstract class AbstractJob implements Job {
    /**
     * EntityManager service.
     *
     * @var EntityManager
     */
    protected $em;

    /**
     * Run currently being processed.
     *
     * @var Run
     */
    private $current_run;

    /**
     * AbstractJob constructor.
     *
     * @param EntityManager $em
     */
    public function __construct( EntityManager $em ) {
        $this->em = $em;
    }

    /**
     * Get the Job's name.
     *
     * @return string
     */
    abstract protected function name();

    /**
     * Get the Job's slug.
     *
     * @return string
     */
    abstract protected function slug();

    /**
     * Get the Job's description.
     *
     * @return mixed
     */
    abstract protected function description();

    /**
     * Fetch all the items the Job can process.
     *
     * @return Collection|WP_Error
     */
    abstract protected function fetch_items();

    /**
     * Process a single item. Return the modified item for further
     * processing.
     *
     * @param mixed $item Queue item to iterate over.
     *
     * @return mixed|null
     */
    abstract protected function process_item( $item );

    /**
     * {@inheritdoc}
     *
     * @param Collection|null $items
     *
     * @return Run|WP_Error
     */
    public function dispatch( Collection $items = null ) {
        if ( null === $items ) {
            $items = $this->fetch_items();
        }

        if ( is_wp_error( $items ) ) {
            return $items;
        }

        if ( ! ( $items instanceof Collection ) ) {
            return new WP_Error(
                'invalid_items',
                sprintf(
                    /* translators: %s: Job ID. */
                    __( 'items passed into dispatch or returned by fetch_items for job %s is not a Collection', 'wp-gistpen' ),
                    $this->slug()
                )
            );
        }

        $run = $this->em->create( \Intraxia\Gistpen\Model\Run::class, array(
            'scheduled_at' => $this->make_timestamp(),
            'items'        => $items,
            'status'       => Status::SCHEDULED,
            'job'          => $this->slug(),
        ) );

        if ( is_wp_error( $run ) ) {
            return $run;
        }

        $this->trigger();

        return $run;
    }

    /**
     * {@inheritdoc}
     *
     * @return WP_Error|null
     */
    public function process() {
        if ( $this->is_running() ) {
            return new WP_Error(
                'job_running',
                sprintf(
                    /* translators: %s: Job ID. */
                    __( 'Job %s is already running.', 'wp-gistpen' ),
                    $this->slug()
                )
            );
        }

        $start_time = time();
        $this->set_status( Status::PROCESSING );

        do {
            $this->current_run = $this->get_next_run();

            if ( null === $this->current_run ) {
                break;
            }

            $this->start();

            $items = $this->current_run->items->to_array();

            foreach ( $items as $key => $item ) {
                $task = $this->process_item( $item );

                if ( $task ) {
                    $items[ $key ] = $task;
                } else {
                    unset( $items[ $key ] );
                }

                if ( $this->time_exceeded( $start_time ) || $this->memory_exceeded() ) {
                    break;
                }
            }

            if ( $items ) {
                $this->pause( $items );
            } else {
                $this->finish();
            }
        } while ( ! $this->time_exceeded( $start_time ) && ! $this->memory_exceeded() );

        $this->set_status( Status::IDLE );
        $this->trigger();

        return null;
    }

    /**
     * {@inheritdoc}
     *
     * @param int $run_id
     *
     * @return Run|WP_Error
     */
    public function fetch( $run_id ) {
        return $this->em->find( \Intraxia\Gistpen\Model\Run::class, $run_id );
    }

    /**
     * {@inheritdoc}
     *
     * @return Collection|WP_Error
     */
    public function runs() {
        return $this->em->find_by( \Intraxia\Gistpen\Model\Run::class, array(
            'order_by' => 'ID',
            'job'      => $this->slug(),
        ) );
    }

    /**
     * {@inheritdoc}
     *
     * @param int $run_id
     *
     * @return Run|WP_Error
     */
    public function run( $run_id ) {
        return $this->em->find( \Intraxia\Gistpen\Model\Run::class, $run_id, array(
            'job' => $this->slug(),
        ) );
    }

    /**
     * {@inheritdoc}
     *
     * @param int $run_id
     *
     * @return Collection|WP_Error
     */
    public function messages( $run_id ) {
        return $this->em->find_by( \Intraxia\Gistpen\Model\Message::class, array(
            'run_id' => $run_id,
        ) );
    }

    /**
     * {@inheritdoc}
     *
     * @return string
     */
    public function get_status() {
        return get_option( $this->make_status_key(), Status::IDLE );
    }

    /**
     * Serializes the model's public data into an array.
     *
     * @return array
     */
    public function serialize() {
        return array(
            'name'        => $this->name(),
            'slug'        => $this->slug(),
            'description' => $this->description(),
            'rest_url'    => rest_url( sprintf(
                'intraxia/v1/gistpen/jobs/%s',
                $this->slug()
            ) ),
            'runs_url'    => rest_url( sprintf(
                'intraxia/v1/gistpen/jobs/%s/runs',
                $this->slug()
            ) ),
        );
    }

    /**
     * Log a new message to the database for the current run.
     *
     * @param string $msg
     * @param string $lvl
     *
     * @return Message|WP_Error
     */
    protected function log( $msg, $lvl = Level::INFO ) {
        return $this->em->create( \Intraxia\Gistpen\Model\Message::class, array(
            'run_id'    => $this->current_run->ID,
            'text'      => $msg,
            'level'     => $lvl,
            'logged_at' => $this->make_timestamp(),
        ) );
    }

    /**
     * Create a new timestamp for the current time for mysql.
     *
     * @return string
     */
    protected function make_timestamp() {
        return current_time( 'mysql' );
    }

    /**
     * Trigger a new job if the current job isn't running
     * and another job is scheduled.
     */
    private function trigger() {
        if ( ! $this->is_running() && $this->has_next_run() ) {
            wp_remote_post( $this->process_url(), $this->get_request_args() );
        }
    }

    /**
     * Get the process URL for the job.
     *
     * @return string
     */
    private function process_url() {
        return rest_url( sprintf(
            'intraxia/v1/gistpen/jobs/%s/process',
            $this->slug()
        ) );
    }

    /**
     * Get the request arguments required by the HTTP request.
     *
     * @return array
     */
    private function get_request_args() {
        return array(
            'timeout'   => 0.01,
            'blocking'  => false,
            'cookies'   => $_COOKIE,
            'sslverify' => apply_filters( 'https_local_ssl_verify', false ),
            'headers'   => array(
                'X-WP-Nonce' => wp_create_nonce( 'wp_rest' ),
            ),
        );
    }

    /**
     * Set the current status of the Job.
     *
     * @param string $status
     */
    private function set_status( $status ) {
        if ( Status::isValid( $status ) ) {
            update_option( $this->make_status_key(), $status, false );
        }
    }

    /**
     * Create the key used by the status option.
     *
     * @return string
     */
    private function make_status_key() {
        return "_wpgp_job_{$this->slug()}_status";
    }

    /**
     * Determine whether the Job is currently running.
     *
     * @return bool
     */
    private function is_running() {
        return $this->get_status() === Status::PROCESSING;
    }

    /**
     * Update the database with the started run.
     */
    private function start() {
        if ( ! $this->current_run->started_at ) {
            $this->current_run->started_at = current_time( 'mysql' );
        }

        $this->current_run->status = Status::RUNNING;

        $this->em->persist( $this->current_run );
    }

    /**
     * Update the database with the remaining items for the paused run.
     *
     * @param array $items
     */
    private function pause( $items ) {
        $this->current_run->items  = new Collection( $this->current_run->items->get_type(), $items );
        $this->current_run->status = Status::PAUSED;

        $this->em->persist( $this->current_run );
    }

    /**
     * Update the database with the finished run.
     */
    private function finish() {
        $this->current_run->status      = Status::FINISHED;
        $this->current_run->finished_at = current_time( 'mysql' );
        $this->current_run->items       = null;

        $this->em->persist( $this->current_run );
    }

    /**
     * Determines whether the amount of time the batch job can
     * run has been exceeded.
     *
     * @param int $start_time
     *
     * @return bool
     */
    private function time_exceeded( $start_time ) {
        return time() >= $start_time + 20;
    }

    /**
     * Determines whether the amount of memory used has
     * exceeded the maximum amount allowed for the run.
     *
     * @return bool
     */
    private function memory_exceeded() {
        $memory_limit = $this->get_memory_limit() * 0.9;
        // 90% of max memory
        $current_memory = memory_get_usage( true );

        return $current_memory >= $memory_limit;
    }

    /**
     * Determine the maximum amount of memory allowed
     * for the run.
     *
     * @return int
     */
    private function get_memory_limit() {
        if ( function_exists( 'ini_get' ) ) {
            $memory_limit = ini_get( 'memory_limit' );
        } else {
            // Sensible default.
            $memory_limit = '128M';
        }

        if ( ! $memory_limit || - 1 === $memory_limit ) {
            // Unlimited, set to 32GB.
            $memory_limit = '32000M';
        }

        return intval( $memory_limit ) * 1024 * 1024;
    }

    /**
     * Retrieve the next run from the database.
     *
     * @return Run|null
     */
    private function get_next_run() {
        $runs = $this->get_paused_or_scheduled_runs();

        if ( $runs->count() === 0 ) {
            return null;
        }

        return $runs->first();
    }

    /**
     * Determine whether there are any more runs for the current job.
     *
     * @return bool
     */
    private function has_next_run() {
        $runs = $this->get_paused_or_scheduled_runs();

        return $runs->count() > 0;
    }

    /**
     * Get all the currently paused or scheduled runs.
     *
     * @return Collection|WP_Error
     */
    private function get_paused_or_scheduled_runs() {
        $runs = $this->em->find_by( \Intraxia\Gistpen\Model\Run::class, array(
            'status' => Status::PAUSED,
        ) );

        if ( $runs->count() === 0 ) {
            $runs = $this->em->find_by( \Intraxia\Gistpen\Model\Run::class, array(
                'status' => Status::SCHEDULED,
            ) );
        }

        return $runs;
    }

    /**
     * Implementation of response error logging conditional logic.
     *
     * @param  string   $error_message  Error message to log out.
     * @param  string   $auth_message   Message to log on auth failure.
     * @param  string   $client_message Message to log on client error.
     * @param  int      $id             Elent ID.
     * @param  WP_Error $response       Error object.
     */
    protected function log_response_error_impl( $error_message, $auth_message, $client_message, $id, WP_Error $response ) {
        $this->log(
            sprintf(
                $error_message,
                $id,
                $response->get_error_message()
            ),
            Level::ERROR
        );

        if ( $response->get_error_code() === 'auth_error' ) {
            $this->log(
                sprintf(
                    $auth_message,
                    $id
                ),
                Level::WARNING
            );
        }

        if ( $response->get_error_code() === 'client_error' ) {
            $this->log(
                sprintf(
                    $client_message,
                    $id
                ),
                Level::WARNING
            );
        }
    }
}