SAREhub/php_component_worker

View on GitHub
src/SAREhub/Component/Worker/ZmqWorkerRunnerBootstrap.php

Summary

Maintainability
A
0 mins
Test Coverage
<?php

namespace SAREhub\Component\Worker;

use Respect\Validation\Validator;
use SAREhub\Component\Worker\Command\ZmqCommandInputServiceFactory;

class ZmqWorkerRunnerBootstrap {
    
    /**
     * @var Worker
     */
    private $worker;
    
    /**
     * @var ZmqCommandInputServiceFactory
     */
    private $commandInputServiceFactory;
    
    private $loggerFactory;
    
    public static function newInstance() {
        return new self();
    }
    
    /**
     * @param Worker $worker
     * @return $this
     */
    public function withWorker(Worker $worker) {
        $this->worker = $worker;
        return $this;
    }
    
    /**
     * @param ZmqCommandInputServiceFactory $factory
     * @return $this
     */
    public function withCommandInputServiceFactory(ZmqCommandInputServiceFactory $factory) {
        $this->commandInputServiceFactory = $factory;
        return $this;
    }
    
    /**
     * @param callable $factory
     * @return $this
     */
    public function withLoggerFactory(callable $factory) {
        $this->loggerFactory = $factory;
        return $this;
    }
    
    /**
     * Creates new worker runner for worker with command input service.
     * @return WorkerRunner
     */
    public function create() {
        $this->checkSetup();
        $commandInputServiceFactory = $this->getCommandInputServiceFactory();
        $runner = WorkerRunner::newInstance()
          ->withWorker($this->worker)
          ->usePcntl()
          ->withCommandInput($commandInputServiceFactory->createCommandInput())
          ->withCommandReplyOutput($commandInputServiceFactory->createCommandReplyOutput());
        $this->registerLoggers($runner);
        return $runner;
    }
    
    /**
     * Wrapper method for running worker runner loop.
     * @param WorkerRunner $runner
     * @param int $usleep Amount of time for sleep in microsecounds.
     */
    public static function runInLoop(WorkerRunner $runner, $usleep = 100) {
        $runner->start();
        while ($runner->isRunning()) {
            $runner->tick();
            usleep($usleep);
        }
        
        $runner->stop();
    }
    
    private function checkSetup() {
        $v = Validator::notEmpty();
        $v->setName('worker')->assert($this->worker);
        $v->setName('commandInputServiceFactory')->assert($this->commandInputServiceFactory);
        
    }
    
    private function getCommandInputServiceFactory() {
        return $this->commandInputServiceFactory
          ->withCommandInputTopic($this->worker->getId());
    }
    
    private function registerLoggers(WorkerRunner $runner) {
        if ($this->loggerFactory) {
            $workerId = $this->worker->getId();
            $runner->setLogger($this->createLogger('runner_'.$workerId));
            $this->worker->setLogger($this->createLogger('worker_'.$workerId));
        }
    }
    
    private function createLogger($name) {
        $factory = $this->loggerFactory;
        return $factory($name);
    }
}