plugins/rabbitmq/classes/yf_wrapper_rabbitmq.class.php
<?php
/**
* Rabbitmq API wrapper.
*
* EXAMPLES: https://github.com/pdezwart/php-amqp/tree/master/tests
* EXAMPLES: https://otokarev.com/2014/11/25/rabbitmq-primer-realizatsii-rpc-na-php/
*/
class yf_wrapper_rabbitmq
{
public $driver = 'pecl'; // pecl | amqplib
public $routing_key = 'mykey';
private $host = 'localhost';
private $port = 5672;
private $login = 'user';
private $password = 'password';
private $vhost = '/';
private $queue = false;
private $_connection = null;
private $_channel = null;
private $_exchange = null;
public function __clone()
{
$this->_connection = null;
}
/**
* @param mixed $params
*/
public function conf($params = [])
{
if ($this->driver == 'pecl' && ! extension_loaded('amqp')) {
$this->driver = null;
// $this->driver = 'amqplib';
}
if (is_array($params) && ! empty($params)) {
isset($params['host']) && $this->host = $params['host'];
isset($params['port']) && $this->port = $params['port'];
isset($params['login']) && $this->login = $params['login'];
isset($params['password']) && $this->password = $params['password'];
isset($params['vhost']) && $this->vhost = $params['vhost'];
isset($params['routing_key']) && $this->routing_key = $params['routing_key'];
}
return $this;
}
/**
* @param mixed $params
*/
public function connect($params = [])
{
if ($this->_connection) {
return $this->_connection;
}
if ($params) {
$this->conf($params);
}
if ($this->driver == 'pecl') {
$this->_connection = new AMQPConnection([
'host' => $this->host,
'port' => $this->port,
'login' => $this->login,
'password' => $this->password,
'vhost' => $this->vhost,
]);
$this->_connection->connect();
} elseif ($this->driver == 'amqplib') {
require_php_lib('amqplib');
$this->_connection = new \PhpAmqpLib\Connection\AMQPConnection(
$this->host,
$this->port,
$this->login,
$this->password,
$this->vhost
);
}
return $this->_connection;
}
public function disconnect()
{
if ($this->driver == 'pecl') {
unset($this->_connection);
} elseif ($this->driver == 'amqplib') {
$cnn = $this->_connection;
$ch = $cnn->channel();
$ch->close();
return $cnn->close();
}
}
/**
* @param mixed $params
*/
public function init_channel($params = [])
{
if ( ! $this->_channel) {
if ($this->driver == 'pecl') {
$cnn = $this->connect($params);
$this->_channel = new AMQPChannel($cnn);
} elseif ($this->driver == 'amqplib') {
$cnn = $this->connect($params);
$this->_channel = $cnn->channel();
}
}
return $this->_channel;
}
/**
* @param mixed $name
* @param mixed $params
*/
public function init_exchange($name, $params = [])
{
if ( ! $name) {
return null;
}
if ( ! isset($this->_exchange[$name])) {
if ($this->driver == 'pecl') {
$ch = $this->init_channel($params);
$ex = new AMQPExchange($ch);
$ex->setName($name);
$ex->setType($params['exchange_type'] ?: AMQP_EX_TYPE_FANOUT);
$ex->declareExchange();
$this->_exchange[$name] = $ex;
} elseif ($this->driver == 'amqplib') {
// TODO
}
}
return $this->_exchange[$name];
}
/**
* @param mixed $name
* @param mixed $params
*/
public function init_queue($name, $params = [])
{
if ( ! $name) {
return null;
}
if ( ! isset($this->_queue[$name])) {
if ($this->driver == 'pecl') {
$ch = $this->init_channel($params);
$q = new AMQPQueue($ch);
$q->setName($name);
$q->setFlags($params['queue_flags'] ?: AMQP_DURABLE);
$q->declareQueue();
$this->_queue[$name] = $q;
}
}
return $this->_queue[$name];
}
public function is_ready()
{
! $this->_connection && $this->connect();
return (bool) $this->_connection;
}
/**
* @param mixed $queue
* @param mixed $what
*/
public function add($queue, $what)
{
if ( ! $queue || ! strlen($what)) {
return false;
}
! $this->_connection && $this->connect();
if ($this->driver == 'pecl') {
$ex = $this->_init_exchange($queue);
$ex->publish($what, $this->routing_key);
} elseif ($this->driver == 'amqplib') {
// TODO
}
return true;
}
/**
* @param mixed $queue
*/
public function get($queue)
{
if ( ! $queue) {
return false;
}
! $this->_connection && $this->connect();
if ($this->driver == 'pecl') {
$q = $this->_init_queue($queue);
$msg = $q->get(AMQP_AUTOACK);
} elseif ($this->driver == 'amqplib') {
// TODO
}
return $msg;
}
/**
* Publish new event.
* @param mixed $channel
* @param mixed $what
*/
public function pub($channel, $what)
{
if ( ! $channel || ! strlen($what)) {
return false;
}
! $this->_connection && $this->connect();
if ($this->driver == 'pecl') {
$ex = $this->_init_exchange($channel);
$ex->publish($what, $this->routing_key);
} elseif ($this->driver == 'amqplib') {
$ch = $this->_connection->channel();
$ch->exchange_declare($channel, 'fanout', false, false, false);
$msg = new \PhpAmqpLib\Message\AMQPMessage($what);
$ch->basic_publish($msg, $channel);
}
}
/**
* Subscribe for one or more events.
* @param mixed $channel
* @param mixed $callback
*/
public function sub($channel, $callback)
{
if ( ! $channel || ! is_callable($callback)) {
return false;
}
! $this->_connection && $this->connect();
if ($this->driver == 'pecl') {
$q = $this->_init_queue($channel);
return $q->consume($callback, AMQP_AUTOACK);
} elseif ($this->driver == 'amqplib') {
$ch = $this->_connection->channel();
$ch->exchange_declare($channel, $e_type = 'fanout', $e_passive = false, $e_durable = false, $e_auto_delete = false);
list($queue_name) = $ch->queue_declare($q_name = '', $q_passive = false, $q_durable = false, $q_exclusive = true, $q_auto_delete = false, $q_nowait = false, $qparams = []);
$ch->queue_bind($queue_name, $channel);
$ch->basic_consume($queue_name, $c_tag = '', $c_no_local = false, $c_no_ack = true, $c_exclusive = false, $c_nowait = false, $callback);
while (count((array) $ch->callbacks)) {
$channel->wait();
}
}
}
}