这个指南包含使用RabbitMQ创建消息应用的基本方法。
Introduction
RabbitMQ 是一个消息中间件. 基本思想非常简单:它接收并转发消息. 你可以把它比作邮局: 你放邮件到邮箱里,并确信邮递员会把它送到收件人手上. 用这些比喻RabbitMQ是一个邮箱,一个邮局和一个邮递员.
RabbitMQ和邮局最主要的区别是它不处理纸质的邮件而是存储和转发二进制的数据-消息(messages).
RabbitMQ和消息一般使用下面的术语
- 生产无非就是发送.一个发送消息的程序叫生产者. 我们像下面这样画它用 "P":
- 队列是一个邮箱的名. 它存在于RabbitMQ内部. 虽然通过RabbitMQ 和你的应用程序的消息流,他们仅能存储在一个消息队列的内部. 队列是没有任何限制的,它能存储大量你想存储的消息 ? 它基本上是一个无限制的缓冲区. 许多生产者能发送消息到一个队列中,许多消费者能从一个队列中接收消息. 一个队列将被画成下面这样,上面是队列的名字:
- 消费和接收很相似。 一个等候接收消息的程序是一个消费者。 我们像下面这样画它用 "C":
"Hello World"
(使用java客户端)
在指南的这个部分我们将用java写两个程序;一个生产者发送单一的消息,并且一个消费者接收消息并打印消息. 我们将会掩盖java api的一下细节, 而关注怎么入手这样非常简单的事情,这是一个"Hello World!"消息。
在下图中, "P" 是我们的生产者并且"C"是我们的消费者. 中间的盒子是一个队列 -- RabbitMQ 保持消费者行为的一个消息缓冲.
Java 客户端库
RabbitMQ 使用高级消息队列协议AMQP(Advanced Message Queuing Protocol), 它是一个开放的,通用的消息协议. 有大量的语言实现了AMQP的客户端,我们将使用java客户端.下载 client library package,解压这个包在你的工作路径并且从这个路径中获取 JAR 文件:$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./
现在我们有java客户端和它的依赖,我们能写一些代码了。
发送
我们将调用我们的消息发送者发送消息并且调用我们的消息接收者接收消息. 发送者将连接到RabbitMQ,发送单一消息然后退出.
在Send.java中, 我们需要导入一些类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
设置类并且命名队列:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后我们创建一个到服务器的连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
这个连接抽象了socket连接,并且关心我们协议版本的协商和认证 。接下来我们创建一个通道, 这些都是通过api来完成.
为了发送,我们必须声明一个队列;然后我们能发布一个消息到这个队列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
声明一个队列是幂等的;如果它不存在它将被创建。这个消息内容是一个字节数组, 所以你能编码以你喜欢的任何方式.
最后我们关闭通道和连接;
channel.close();
connection.close();
这是整个 Send.java class.
接收
上面的是我们的发送者. 我们的接收者被 RabbitMQ推送消息, 所以不像发送者发送单一的消息, 我们将保持接收者运行来监听消息并打印它们.
下面的代码 (在Recv.java) 是和发送同样重要的:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
这个QueueingConsumer 是一个类我们将使用它缓冲服务器推送给我们的消息。
和设置发送者一样,我们打开一个连接和通道,并且声明我们将消费的队列. 注意和发送者的队列匹配.
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意我们这儿声明了队列.因为我们可以在发送者开始前开始接收者, 我们想确保在我们想消费消息时队列已经存在.
我们将告诉服务器传送我们的消息从队列. 因此它将异步推送我们的消息,我们提供一个回调来缓冲这个消息直到我们使用它。 这就是 QueueingConsumer 的功能.
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
QueueingConsumer.nextDelivery() 阻塞直到其他的消息被服务器传送.
这是整个 Recv.java class.
没有评论:
发表评论