stk2k/eventstream

View on GitHub
src/EventChannel.php

Summary

Maintainability
A
25 mins
Test Coverage
<?php
declare(strict_types=1);

namespace stk2k\eventstream;

use \RuntimeException;

use stk2k\eventstream\emitter\SimpleEventEmitter;
use stk2k\eventstream\exception\EventSourceIsNotPushableException;
use stk2k\eventstream\source\SimpleEventSource;

class EventChannel
{
    /** @var EventSourceInterface */
    private $source;
    
    /** @var EventEmitterInterface */
    private $emitter;

    /** @var bool */
    private $auto_flush;

    /**
     * construct
     *
     * @param EventSourceInterface $source
     * @param EventEmitterInterface $emitter
     */
    public function __construct(EventSourceInterface $source = null, EventEmitterInterface $emitter = null)
    {
        $this->source = $source ? $source : new SimpleEventSource();
        $this->emitter = $emitter ? $emitter : new SimpleEventEmitter();
        $this->auto_flush = false;
    }

    /**
     * change event source
     *
     * @param EventSourceInterface $source
     *
     * @return EventChannel
     */
    public function source(EventSourceInterface $source) : self
    {
        $this->source = $source;
        return $this;
    }

    /**
     * get event source
     *
     * @return EventSourceInterface
     */
    public function getSource() : EventSourceInterface
    {
        return $this->source;
    }

    /**
     * change event emitter
     *
     * @param EventEmitterInterface $emitter
     *
     * @return EventChannel
     */
    public function emitter($emitter) : self
    {
        $this->emitter = $emitter;
        return $this;
    }

    /**
     * get event emitter
     *
     * @return EventEmitterInterface
     */
    public function getEmitter() : EventEmitterInterface
    {
        return $this->emitter;
    }

    /**
     * store event
     *
     * @param Event $event
     *
     * @return EventChannel
     *
     * @throws EventSourceIsNotPushableException, OverflowException
     */
    public function push(Event $event) : self
    {
        if ($this->source){
            if (!$this->source->canPush()){
                throw new EventSourceIsNotPushableException();
            }
            $this->source->push($event);
            if ($this->auto_flush){
                $this->flush();
            }
        }
        return $this;
    }

    /**
     * flush stream
     *
     * @return EventChannel
     */
    public function flush(){
        if (!$this->source || !$this->emitter){
            return $this;
        }
        while($e = $this->source->next()){
            if ($e instanceof Event){
                $this->emitter->emit($e);
            }
            else{
                throw new RuntimeException('event source returns invalid event:' . print_r($e,true));
            }
        }
        return $this;
    }

    /**
     * listen event
     *
     * @param string $event
     * @param callable $listener
     *
     * @return EventChannel
     */
    public function listen(string $event, callable $listener) : self
    {
        if (!$this->emitter){
            return $this;
        }
        $this->emitter->listen($event, $listener);
        return $this;
    }

    /**
     * Update auto flush flags in all channels
     *
     * @param bool $auto_flush
     *
     * @return EventChannel
     */
    public function setAutoFlush(bool $auto_flush) : self
    {
        $this->auto_flush = $auto_flush;
        return $this;
    }
}