在《rabbitmq学习2:Work Queues 》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC )了!
关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726
现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:
上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:
1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:
- Message properties
- The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
- delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
- content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
- reply_to: Commonly used to name a callback queue.
- correlation_id: Useful to correlate RPC responses with requests.
delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;
content_type : 用来描述MIME的类型。如把其类型设定为JSON;
reply_to : 用于命名一个回调Queue;
correlation_id : 用于与相关联的请求的RPC响应.
现在我们就开始RPC的程序吧!
client的代码如下:
- package com.abin.rabbitmq;
- import java.util.UUID;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class RPCClient {
- private Connection connection;
- private Channel channel;
- private String requestQueueName = "rpc_queue";
- private String replyQueueName;
- private QueueingConsumer consumer;
- public RPCClient() throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- connection = factory.newConnection();
- channel = connection.createChannel();
- replyQueueName = channel.queueDeclare().getQueue();
- consumer = new QueueingConsumer(channel);
- channel.basicConsume(replyQueueName, true, consumer);
- }
- public String call(String message) throws Exception {
- String response = null;
- String corrId = UUID.randomUUID().toString();
- BasicProperties props = new BasicProperties();
- props.setReplyTo(replyQueueName);
- props.setCorrelationId(corrId);
- channel.basicPublish("", requestQueueName, props, message.getBytes());
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- if (delivery.getProperties().getCorrelationId().equals(corrId)) {
- response = new String(delivery.getBody(), "UTF-8");
- break;
- }
- }
- return response;
- }
- public void close() throws Exception {
- connection.close();
- }
- public static void main(String[] argv) {
- RPCClient fibonacciRpc = null;
- String response = null;
- try {
- fibonacciRpc = new RPCClient();
- System.out.println(" [x] Requesting fib(30)");
- response = fibonacciRpc.call("30");
- System.out.println(" [.] Got '" + response + "'");
- System.out.println(" [x] Requesting fib(-1)");
- response = fibonacciRpc.call("-1");
- System.out.println(" [.] Got '" + response + "'");
- System.out.println(" [x] Requesting fib(a)");
- response = fibonacciRpc.call("a");
- System.out.println(" [.] Got '" + response + "'");
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (fibonacciRpc != null) {
- try {
- fibonacciRpc.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
- }
server的代码如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class RPCServer {
- private static final String RPC_QUEUE_NAME = "rpc_queue";
- private static int fib(int n) {
- if (n > 1)
- return fib(n - 1) + fib(n - 2);
- else
- return n;
- }
- public static void main(String[] argv) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- connection = factory.newConnection();
- channel = connection.createChannel();
- channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
- channel.basicQos(1);
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
- System.out.println(" [x] Awaiting RPC requests");
- while (true) {
- String response = null;
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- BasicProperties props = delivery.getProperties();
- BasicProperties replyProps = new BasicProperties();
- replyProps.setCorrelationId(props.getCorrelationId());
- try {
- String message = new String(delivery.getBody(), "UTF-8");
- int n = Integer.parseInt(message);
- System.out.println(" [.] fib(" + message + ")");
- response = "" + fib(n);
- } catch (Exception e) {
- System.out.println(" [.] " + e.toString());
- response = "";
- } finally {
- channel.basicPublish("", props.getReplyTo(), replyProps,
- response.getBytes("UTF-8"));
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),
- false);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception ignore) {
- }
- }
- }
- }
- }
先运行服务器端,运行结果如下:
- [x] Awaiting RPC requests
再运行运行客户端,运行结果如下:
- [x] Requesting fib(30)
- [.] Got '832040'
- [x] Requesting fib(-1)
- [.] Got '-1'
- [x] Requesting fib(a)
- [.] Got ''
在服务器还可以出现:
- [.] fib(30)
- [.] fib(-1)
- [.] java.lang.NumberFormatException: For input string: "a"
没有评论:
发表评论