diff --git a/common/components/RabbitMQ/Component.php b/common/components/RabbitMQ/Component.php new file mode 100644 index 0000000..89da8f8 --- /dev/null +++ b/common/components/RabbitMQ/Component.php @@ -0,0 +1,177 @@ + + * + * @property AMQPStreamConnection $connection AMQP connection. + * @property AMQPChannel $channel AMQP channel. + */ +class Component extends \yii\base\Component { + + const TYPE_TOPIC = 'topic'; + const TYPE_DIRECT = 'direct'; + const TYPE_HEADERS = 'headers'; + const TYPE_FANOUT = 'fanout'; + + /** + * @var AMQPStreamConnection + */ + protected $amqpConnection; + + /** + * @var AMQPChannel[] + */ + protected $channels = []; + + /** + * @var string + */ + public $host = '127.0.0.1'; + + /** + * @var integer + */ + public $port = 5672; + + /** + * @var string + */ + public $user; + + /** + * @var string + */ + public $password; + + /** + * @var string + */ + public $vhost = '/'; + + /** + * @inheritdoc + */ + public function init() { + parent::init(); + if (empty($this->user)) { + throw new Exception("Parameter 'user' was not set for AMQP connection."); + } + } + + /** + * @return AMQPStreamConnection + */ + public function getConnection() { + if (!$this->amqpConnection) { + $this->amqpConnection = new AMQPStreamConnection( + $this->host, + $this->port, + $this->user, + $this->password, + $this->vhost + ); + } + + return $this->amqpConnection; + } + + /** + * @param string $channel_id + * @return AMQPChannel + */ + public function getChannel($channel_id = null) { + $index = $channel_id ?: 'default'; + if (!array_key_exists($index, $this->channels)) { + $this->channels[$index] = $this->getConnection()->channel($channel_id); + } + + return $this->channels[$index]; + } + + // TODO: метод sendToQueue + + /** + * Sends message to the exchange. + * + * @param string $exchangeName + * @param string $routingKey + * @param string|array $message + * @param array $exchangeArgs + * @param array $publishArgs + */ + public function sendToExchange($exchangeName, $routingKey, $message, $exchangeArgs = [], $publishArgs = []) { + $message = $this->prepareMessage($message); + $channel = $this->getChannel(); + call_user_func_array([$channel, 'exchange_declare'], $this->prepareExchangeArgs($exchangeName, $exchangeArgs)); + call_user_func_array([$channel, 'basic_publish'], $this->preparePublishArgs($message, $exchangeName, $routingKey, $publishArgs)); + } + + /** + * Объединяет переданный набор аргументов с поведением по умолчанию + * + * @param string $exchangeName + * @param array $args + * @return array + */ + protected function prepareExchangeArgs($exchangeName, array $args) { + return array_replace([ + $exchangeName, + self::TYPE_FANOUT, + false, + false, + false, + ], $args); + } + + /** + * Объединяет переданный набор аргументов с поведением по умолчанию + * + * @param AMQPMessage $message + * @param string $exchangeName + * @param string $routeKey + * @param array $args + * + * @return array + */ + protected function preparePublishArgs($message, $exchangeName, $routeKey, array $args) { + return array_replace([ + $message, + $exchangeName, + $routeKey, + ], $args); + } + + /** + * Returns prepaired AMQP message. + * + * @param string|array|object $message + * @param array $properties + * @return AMQPMessage + * @throws Exception If message is empty. + */ + public function prepareMessage($message, $properties = null) { + if ($message instanceof AMQPMessage) { + return $message; + } + + if (empty($message)) { + throw new Exception('AMQP message can not be empty'); + } + + if (is_array($message) || is_object($message)) { + $message = Json::encode($message); + } + + return new AMQPMessage($message, $properties); + } + +} diff --git a/common/components/RabbitMQ/Controller.php b/common/components/RabbitMQ/Controller.php new file mode 100644 index 0000000..7334a6c --- /dev/null +++ b/common/components/RabbitMQ/Controller.php @@ -0,0 +1,179 @@ +configureListen(); + } + + /** + * Имя exchange, который будет прослушивать этот интерпретатор + * + * @return string + */ + abstract public function getExchangeName(); + + /** + * Есть метод вернёт null, то будет создана временная очередь, которая будет автоматически удалена + * после завершения процесса её обработчика + * + * @return null|string + */ + public function getQueueName() { + return null; + } + + /** + * @return Component + */ + protected function getAmqp() { + return Yii::$app->get('amqp'); + } + + protected function configureListen() { + $exchangeName = $this->getExchangeName(); + $connection = $this->getAmqp()->getConnection(); + $channel = $this->getAmqp()->getChannel(); + call_user_func_array([$channel, 'exchange_declare'], $this->getExchangeDeclareArgs()); + list($queueName) = call_user_func_array([$channel, 'queue_declare'], $this->getQueueDeclareArgs()); + // TODO: нужно продумать механизм для подписки на множество роутов + call_user_func_array([$channel, 'queue_bind'], $this->getQueueBindArgs($exchangeName, $queueName)); + call_user_func_array([$channel, 'basic_consume'], $this->getBasicConsumeArgs($queueName)); + $channel->basic_qos(null, 1, true); + + while(count($channel->callbacks)) { + $channel->wait(); + } + + $channel->close(); + $connection->close(); + } + + public function callback(AMQPMessage $msg) { + $routingKey = $msg->delivery_info['routing_key']; + $method = 'route' . Inflector::camelize($routingKey); + $body = Json::decode($msg->body, true); + + if (!method_exists($this, $method)) { + $this->log( + sprintf('Unknown routing key "%s" for exchange "%s".', $routingKey, $this->getExchangeName()), + static::MESSAGE_ERROR + ); + + $this->log( + print_r($body, true), + static::MESSAGE_INFO + ); + } + + // Инверсия значения, т.к. параметр называется no_ack, то есть уже инвертирован + $isAckRequired = !ArrayHelper::getValue($this->getBasicConsumeArgs($this->getQueueName()), 3, true); + $result = $this->getResult($method, $body, $msg); + if ($isAckRequired) { + if ($result === false) { + $this->reject($msg, true); + } else { + $this->ack($msg); + } + } + } + + private function getResult($method, $body, $msg) { + try { + $result = $this->$method($body, $msg); + } catch(Exception $e) { + if (strstr($e->getMessage(), '2006 MySQL server has gone away') !== false) { + Console::output(Console::ansiFormat('Server gone away, try to reconnect', [Console::FG_GREY])); + Yii::$app->db->close(); + Yii::$app->db->open(); + Console::output(Console::ansiFormat('recall method', [Console::FG_GREY])); + $result = $this->$method($body, $msg); + } else { + throw $e; + } + } + + return $result; + } + + /** + * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::exchange_declare() + * По умолчанию создаётся очередь с типом fanout. Кроме того, в отличие от стандартных аргументов, + * здесь указано, что auto_delete в false состоянии + * + * @return array + */ + protected function getExchangeDeclareArgs() { + return [$this->getExchangeName(), Component::TYPE_FANOUT, false, false, false]; + } + + /** + * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_declare() + * + * Если метод getQueueName() не переопределён и в нём не задано имя очереди, то будет создана + * временная очередь, которая будет автоматически удалена после завершения работы всех Consumer'ов + * Если же есть фиксированное имя очереди, то она будет создана с аргументом + * auto_delete в false (4 индекс массива) + * + * @return array + */ + protected function getQueueDeclareArgs() { + $queueName = $this->getQueueName(); + if ($queueName === null) { + return []; + } else { + return [$queueName, false, false, false, false]; + } + } + + /** + * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::queue_bind() + * + * @param string $exchangeName + * @param string $queueName + * @return array + */ + protected function getQueueBindArgs($exchangeName, $queueName) { + return [$queueName, $exchangeName]; + } + + /** + * Список аргументов, с которым будет вызван метод \PhpAmqpLib\Channel\AMQPChannel::basic_consume() + * По умолчанию здесь находятся стандартные аргументы для этого метода + * + * @param string $queueName + * @return array + */ + protected function getBasicConsumeArgs($queueName) { + return [$queueName, '', false, false, false, false, [$this, 'callback']]; + } + + /** + * Logs info and error messages. + * + * TODO: что-то мне подсказывает, что ему тут не место + * + * @param $message + * @param $type + */ + protected function log($message, $type = self::MESSAGE_INFO) { + $format = [$type == self::MESSAGE_ERROR ? Console::FG_RED : Console::FG_BLUE]; + Console::output(Console::ansiFormat($message, $format)); + } + +} diff --git a/common/components/RabbitMQ/Helper.php b/common/components/RabbitMQ/Helper.php new file mode 100644 index 0000000..63a4468 --- /dev/null +++ b/common/components/RabbitMQ/Helper.php @@ -0,0 +1,19 @@ +get('amqp'); + } + + public static function sendToExchange($exchange, $routingKey, $message, $exchangeArgs = []) { + static::getInstance()->sendToExchange($exchange, $routingKey, $message, $exchangeArgs); + } + +} diff --git a/common/components/RabbitMQ/MessageTrait.php b/common/components/RabbitMQ/MessageTrait.php new file mode 100644 index 0000000..d39808e --- /dev/null +++ b/common/components/RabbitMQ/MessageTrait.php @@ -0,0 +1,16 @@ +delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + } + + public function reject(AMQPMessage $msg, $requeue = true) { + $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], $requeue); + } + +} diff --git a/common/config/main.php b/common/config/main.php index e568d4a..6ec7952 100644 --- a/common/config/main.php +++ b/common/config/main.php @@ -19,5 +19,8 @@ return [ 'redis' => [ 'class' => 'yii\redis\Connection', ], + 'amqp' => [ + 'class' => \common\components\RabbitMQ\Component::class, + ], ], ]; diff --git a/common/helpers/Amqp.php b/common/helpers/Amqp.php new file mode 100644 index 0000000..3b1f321 --- /dev/null +++ b/common/helpers/Amqp.php @@ -0,0 +1,9 @@ + RabbitMQComponent::TYPE_DIRECT, // exchange-type -> direct + 3 => false, // no-ack -> false + ]); + } + + public function getQueueBindArgs($exchangeName, $queueName) { + return [$exchangeName, $queueName, '#']; // Мы хотим получать сюда все события по аккаунту + } + + public function routeChangeUsername($body) { + // TODO: implement this + } + +} diff --git a/console/controllers/base/AmqpController.php b/console/controllers/base/AmqpController.php new file mode 100644 index 0000000..32527f6 --- /dev/null +++ b/console/controllers/base/AmqpController.php @@ -0,0 +1,8 @@ + 6379, 'database' => 0, ], + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'root', + 'password' => '', + 'vhost' => '/', + ], ], ]; diff --git a/environments/prod/common/config/main-local.php b/environments/prod/common/config/main-local.php index 6fb5f85..9be5f6c 100644 --- a/environments/prod/common/config/main-local.php +++ b/environments/prod/common/config/main-local.php @@ -12,5 +12,12 @@ return [ 'port' => 6379, 'database' => 0, ], + 'amqp' => [ + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'root', + 'password' => '', + 'vhost' => '/', + ], ], ];