2013年6月14日星期五

RabbitMQ持久化的PHP使用示例

      在使用RabbitMQ时候想对消息进行持久化,一开始exchange和queue做了持久化设置setFlags(AMQP_DURABLE),重启rabbitmq-server后发现数据丢了。后来看官方手册里的publish方法里有delivery_mode、priority两个属性,delivery_mode:1非持久化,delivery_mode:2持久化,其中priority是设置消息优先级的可设置0-9级别,但是官方解释好像并未实现,测试中发现并未起作用。即使设置了持久化,也不能完全保证消息不丢失,比如RabbitMQ接受到消息后还没来得及写到磁盘就宕机了,那么就会有小概率的信息丢失。

具体使用示例如下:
<?php
//连接RabbitMQ
$conn_args = array('host' =>'localhost', 'port'=> '5672', 'login' =>'guest', 'password'=> 'guest', 'vhost' => '/');
$conn = new AMQPConnection($conn_args);
$conn->connect();

$channel = new AMQPChannel($conn);
$exchangeName = 'reported';
$queueName = 'request';
$queueKey = 'request_item';

if( isset($_GET['read']) ){ //读队列

$q = new AMQPQueue($channel);
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE);
$messages = '';
try {
$q->bind($exchangeName, $queueKey);
$messages = $q->get(AMQP_AUTOACK); //消息获取
!empty($messages) && $messages = json_decode($messages->getBody(), true);
} catch (Exception $e) {
var_dump($e);
}
var_dump($messages);
$conn->disconnect();

}else{ //写队列

$ex = new AMQPExchange($channel);
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_TOPIC);
$ex->setFlags(AMQP_DURABLE); //exchange持久化
$ex->declare();

$q = new AMQPQueue($channel);
$q->setName($queueName);
$q->setFlags(AMQP_DURABLE); //queue持久化
$q->declare();
$q->bind($exchangeName, $queueKey);
$channel->startTransaction();
$message = array('content' => 'test','time' => time());

/**
* 消息持久化,delivery_mode:2持久化、delivery_mode:1非持久化,其中priority是设置消息的优先级,测试中发现并未起作用。
* 消息还有其他属性,请参考http://www.php.net/manual/zh/amqpexchange.publish.php
*/
$result = $ex->publish(json_encode($message), $queueKey, AMQP_NOPARAM, array('delivery_mode'=>2, 'priority'=> 9));
var_dump($result);
$channel->commitTransaction();
$conn->disconnect();
}

?>

没有评论:

发表评论