components/ChannelComponent.php
<?php
namespace mirocow\queue\components;
use mirocow\queue\exceptions\ChannelException;
use mirocow\queue\interfaces\ChannelInterface;
use mirocow\queue\models\MessageModel;
use yii\base\Component;
use yii\base\InvalidConfigException;
/**
* Class BaseQueueChannel
* @package mirocow\queue
*
* @property $driver \mirocow\queue\interfaces\DriverInterface
*/
class ChannelComponent extends Component implements ChannelInterface
{
/**
* @event PushEvent
*/
const EVENT_BEFORE_PUSH = 'beforePush';
/**
* @event PushEvent
*/
const EVENT_AFTER_PUSH = 'afterPush';
/**
* @var
*/
public $driver;
/**
* @var string
*/
public $queueName = 'queue';
/**
* @var string
*/
public $channelName = "default";
/**
* @var int
*/
public $pushDelay = 0;
/**
* @var null
*/
public $pushPriority = NULL;
/**
* @throws InvalidConfigException
*/
public function init()
{
parent::init();
if (is_array($this->driver) || class_exists($this->driver)) {
$this->driver = \Yii::createObject($this->driver);
} elseif (\Yii::$app->has($this->driver)) {
$this->driver = \yii\di\Instance::ensure(
\Yii::$app->{$this->driver}
);
} else
throw new InvalidConfigException;
}
/**
* @param null $queue
* @return string
*/
public function getQueueName($queue = null)
{
return ($queue ?: $this->queueName) . ':' . $this->channelName;
}
/**
* @param array $options
* @return ChannelComponent
* @throws InvalidConfigException
*/
public function create(array $options)
{
try {
return new self(\Yii::createObject($options));
} catch (ChannelException $cex) {
throw new $cex;
}
}
/**
* Open channel connection
*/
public function open()
{
}
/**
* Close channel connection
*/
public function close()
{
}
/**
* Send data to channel connection
*
* @param string $name
* @param mixed $data
*/
public function send($name, $data)
{
}
/**
* @param MessageModel $message
* @param int $delay
* @return mixed
* @throws ChannelException
*/
public function push(MessageModel $message, $delay = 0, $pushPriority = null) {
$event = new PushEvent(
[
'message' => $message,
'delay' => $delay,
'priority' => $pushPriority,
]
);
$this->pushDelay = 0;
$this->pushPriority = NULL;
$this->trigger(self::EVENT_BEFORE_PUSH, $event);
$return = false;
if ($message instanceof MessageModel) {
if ($message->validate()) {
$return = $this->driver->push($message->toJSON(), $this->getQueueName(), $delay, $pushPriority);
} else {
throw new ChannelException("message is not valid MessageModel");
}
} else {
throw new ChannelException("message is not instanceof MessageModel");
}
$this->trigger(self::EVENT_AFTER_PUSH, $event);
return $return;
}
/**
* Pops message from the queue.
*
* @return MessageModel|null
*/
public function pop()
{
$message = null;
if (list($id, $rawMessage) = $this->driver->pop($this->getQueueName())) {
if(is_numeric($id)){
$message = MessageModel::loadRawMessage($rawMessage);
$this->driver->delete($this->getQueueName(), $id);
}
}
return $message;
}
/**
* Purge the queue.
*
*/
public function purge()
{
return $this->driver->purge($this->getQueueName());
}
/**
* Release the message.
*
* @param array $message
* @param integer $delay
*/
public function release(array $message, $delay = 0)
{
return $this->driver->release($message, $delay);
}
/**
* Delete the message.
*
* @param integer $id
*/
public function delete(int $id = null)
{
return $this->driver->delete($this->getQueueName(), $id);
}
/**
* @param $queueName
* @return mixed
*/
public function status($id = null){
return $this->driver->status($this->getQueueName(), $id);
}
}