dusterio/laravel-aws-worker

View on GitHub
src/Controllers/WorkerController.php

Summary

Maintainability
A
45 mins
Test Coverage
<?php

namespace Dusterio\AwsWorker\Controllers;

use Dusterio\AwsWorker\Exceptions\MalformedRequestException;
use Dusterio\AwsWorker\Jobs\AwsJob;
use Dusterio\AwsWorker\Wrappers\WorkerInterface;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Http\Request;
use Illuminate\Queue\Worker;
use Illuminate\Contracts\Console\Kernel;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Http\Response;

class WorkerController extends LaravelController
{
    /**
     * @var array
     */
    protected $awsHeaders = [
        'X-Aws-Sqsd-Queue', 'X-Aws-Sqsd-Msgid', 'X-Aws-Sqsd-Receive-Count'
    ];

    /**
     * @var string
     */
    const LARAVEL_SCHEDULE_COMMAND = 'schedule';

    /**
     * This method is nearly identical to ScheduleRunCommand shipped with Laravel, but since we are not interested
     * in console output we couldn't reuse it
     *
     * @param Container $laravel
     * @param Kernel $kernel
     * @param Schedule $schedule
     * @param Request $request
     * @return array
     */
    public function schedule(Container $laravel, Kernel $kernel, Schedule $schedule, Request $request)
    {
        $command = $request->headers->get('X-Aws-Sqsd-Taskname', $this::LARAVEL_SCHEDULE_COMMAND);
        if ($command != $this::LARAVEL_SCHEDULE_COMMAND) return $this->runSpecificCommand($kernel, $request->headers->get('X-Aws-Sqsd-Taskname'));

        $kernel->bootstrap();
        
        $events = $schedule->dueEvents($laravel);
        $eventsRan = 0;
        $messages = [];

        foreach ($events as $event) {
            if (method_exists($event, 'filtersPass') && (new \ReflectionMethod($event, 'filtersPass'))->isPublic() && ! $event->filtersPass($laravel)) {
                continue;
            }

            $messages[] = 'Running: '.$event->getSummaryForDisplay();

            $event->run($laravel);

            ++$eventsRan;
        }

        if (count($events) === 0 || $eventsRan === 0) {
            $messages[] = 'No scheduled commands are ready to run.';
        }

        return $this->response($messages);
    }

    /**
     * @param string $command
     * @return array
     */
    protected function parseCommand($command)
    {
        $elements = explode(' ', $command);
        $name = $elements[0];
        if (count($elements) == 1) return [$name, []];

        array_shift($elements);
        $arguments = [];

        array_map(function($parameter) use (&$arguments) {
            if (strstr($parameter, '=')) {
                $parts = explode('=', $parameter);
                $arguments[$parts[0]] = $parts[1];
                return;
            }

            $arguments[$parameter] = true;
        }, $elements);

        return [
            $name,
            $arguments
        ];
    }

    /**
     * @param Kernel $kernel
     * @param $command
     * @return Response
     */
    protected function runSpecificCommand(Kernel $kernel, $command)
    {
        list ($name, $arguments) = $this->parseCommand($command);

        $exitCode = $kernel->call($name, $arguments);

        return $this->response($exitCode);
    }

    /**
     * @param Request $request
     * @param WorkerInterface $worker
     * @param Container $laravel
     * @param ExceptionHandler $exceptions
     * @return Response
     */
    public function queue(Request $request, WorkerInterface $worker, Container $laravel, ExceptionHandler $exceptions)
    {
        //$this->validateHeaders($request);
        $body = $this->validateBody($request, $laravel);

        $job = new AwsJob($laravel, $request->header('X-Aws-Sqsd-Queue'), [
            'Body' => $body,
            'MessageId' => $request->header('X-Aws-Sqsd-Msgid'),
            'ReceiptHandle' => false,
            'Attributes' => [
                'ApproximateReceiveCount' => $request->header('X-Aws-Sqsd-Receive-Count'),
                'SentTimestamp' => $request->headers->has('X-Aws-Sqsd-First-Received-At') ? strtotime($request->header('X-Aws-Sqsd-First-Received-At')) * 1000 : null
            ]
        ]);

        $worker->process(
            $request->header('X-Aws-Sqsd-Queue'), $job, array_merge([
                'delay' => 0
            ],  $this->tryToExtractOptions($body))
        );

        return $this->response([
            'Processed ' . $job->getJobId()
        ]);
    }

    /**
     * @param $input
     * @return int
     */
    private function tryToExtractOptions($input)
    {
        $parameters = [
            'maxTries' => 0,
            'timeout' => 60
        ];

        // Try to extract listener class options from the serialized job body
        if (preg_match('/tries\\\\\\\\\\\\";i:(?<attempts>\d+);/', $input, $matches)) {
            $parameters['maxTries'] = intval($matches['attempts']);
        }

        if (preg_match('/timeout\\\\\\\\\\\\";i:(?<timeout>\d+);/', $input, $matches)) {
            $parameters['timeout'] = intval($matches['timeout']);
        }

        return $parameters;
    }

    /**
     * @param Request $request
     * @throws MalformedRequestException
     */
    private function validateHeaders(Request $request)
    {
        foreach ($this->awsHeaders as $header) {
            if (! $this->hasHeader($request, $header)) {
                throw new MalformedRequestException('Missing AWS header: ' . $header);
            }
        }
    }

    /**
     * @param Request $request
     * @param $header
     * @return bool
     */
    private function hasHeader(Request $request, $header)
    {
        if (method_exists($request, 'hasHeader')) {
            return $request->hasHeader($header);
        }

        return $request->header($header, false);
    }

    /**
     * @param Request $request
     * @param Container $laravel
     * @return string
     * @throws MalformedRequestException
     */
    private function validateBody(Request $request, Container $laravel)
    {
        if (empty($request->getContent())) {
            throw new MalformedRequestException('Empty request body');
        }

        $job = json_decode($request->getContent(), true);
        if ($job === null) throw new MalformedRequestException('Unable to decode request JSON');

        if (isset($job['job']) && isset($job['data'])) return $request->getContent();

        // If the format is not the standard Laravel format, try to mimic it
        $queueId = explode('/', $request->header('X-Aws-Sqsd-Queue'));
        $queueId = array_pop($queueId);

        $class = (array_key_exists($queueId, $laravel['config']->get('sqs-plain.handlers')))
            ? $laravel['config']->get('sqs-plain.handlers')[$queueId]
            : $laravel['config']->get('sqs-plain.default-handler');

        return json_encode([
            'job' => $class . '@handle',
            'data' => $request->getContent()
        ]);
    }
    
    /**
     * @param array $messages
     * @param int $code
     * @return Response
     */
    private function response($messages = [], $code = 200)
    {
        return new Response(json_encode([
            'message' => $messages,
            'code' => $code
        ]), $code);
    }
}