2013年7月30日星期二

Rabbitmq教程翻译(六)Remote procedure call (RPC)远程过程调用

远程过程调用(RPC)

(使用Java客户端)

第二个教程中, 我们学会了如何使用工作队列分发在多个工作者耗时消费的任务。
但是,如果我们需要在远程计算机上运行的功能,并等待结果?嗯,这是一个不同的故事。这种模式通常被称为远程过程调用RPC。
在本教程中,我们要使用RabbitMQ的构建RPC系统:一个客户端和一个可扩展的RPC服务器。正如我们没有任何耗时的任务,是值得分发,我们将创建一个虚拟的返回斐波那契数列的RPC服务。

客户端界面

为了说明RPC服务可以被我们用来创建一个简单的客户端类。要揭露命名的方法调用 发送RPC请求和阻塞,直到接收到应答:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

RPC的注意事项

虽然RPC是一种很常见的计算模式,它经常被批评。出现问题,当程序员不知道是否是本地的还是一个缓慢的RPC调用一个函数。困惑一样,在一个不可预知的系统,并增加了不必要的复杂性调试。简化了软件,而是误用RPC可以导致难以维护的意大利面条式代码。
铭记这一点,请考虑以下建议:
  • 确保函数调用哪个是本地的,哪个是远程的。
  • 记录您的系统。组件之间的依赖关系清晰。
  • 处理错误的案件。客户端应该如何反应时,RPC服务器是很长一段时间?
当有疑问时避免RPC。如果可以的话,你应该使用异步管道 - 而不是类RPC阻塞,结果被异步推到下一个计算阶段。

回调队列

通常通过RabbitMQ做远程过程调用是很容易的。客户端发送一个请求消息和服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(这是在Java客户端独有的)。让我们试试吧:
callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性

AMQP协议预先定义一组14项属性去发送一条消息。但是大部分的属性很少使用,与下列情况除外:
  • deliveryMode:将邮件标记为持久(价值2)或瞬态(任何其他值)。从第二个教程, 你也许还记得这个属性。
  • 的contentType:用于描述MIME类型的编码。例如经常使用的JSON编码,它是一个很好的做法,将此属性设置为:应用程序/ json
  • REPLYTO:通 ​​常用来命名一个回调队列。
  • correlationId:有用的相关RPC请求的响应。
我们需要这种新的进口:
import com.rabbitmq.client.AMQP.BasicProperties;

相关ID

在上述方法中,我们建议创建一个回调队列中的每一个RPC请求。这是非常低效的,但幸运的是有一个更好的方法 - 让每个客户端创建一个单一的回调队列。
这就提出了一个新的问题,收到的响应队列中,它不是明确的请求的响应属于。当的 correlationId属性。我们打算将它设置为一个独特的价值为每一个请求。后来,当我们收到一条消息,在回调队列,我们来看看在这个属性,的基础上,我们将能够匹配的请求的响应。如果我们看到一个未知的 correlationId价值,我们可以安全地丢弃这个消息-它不属于我们的要求。
你可能会问,我们为什么要忽略未知消息在回调队列,而不是失败,错误?这是由于在服务器端的竞争条件的可能性。尽管可能性不大,但它是可能的RPC服务器会死,只是给我们答案后,但在此之前请求确认消息发送。如果发生这种情况,重新启动RPC服务器将处理该请求。这就是为什么在客户端上,我们必须处理的重复响应摆好,和RPC的理想应该是幂等的。

总结

我们的RPC将是这样的:
  • 当客户端启动时,它创建了一个匿名的专用回调队列。
  • 对于一个RPC请求,客户端发送一个消息,有两个属性: REPLYTO,它被设置为回调队列correlationId的,这是设置一个独特的价值为每一个请求。
  • 该请求被发送到一个rpc_queue队列。
  • RPC工人(又名:服务器)正在等待该队列的请求。当一个请求出现时,它的工作的结果返回到客户端发送消息,使用队列从REPLYTO领域。
  • 客户端等待回调队列中的数据。当消息出现时,它会检查财产correlationId。如果从请求的值匹配时,它返回到应用程序的响应。

全部放在一起

The Fibonacci task:
private static int fib(int n) throws Exception {
    if (n == 0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
}
我们声明斐波纳契功能。它假定只有有效的正整数输入。(不要指望这一个大的数字,它可能是最慢的递归实现)。
为RPC服务器RPCServer.java的 代码看起来像这样:
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties
                                     .Builder()
                                     .correlationId(props.getCorrelationId())
                                     .build();

    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);

    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);

    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
服务器代码相当简单:
  • 像往常一样,我们开始建立连接,通道和申报队列。
  • 我们可能需要运行多个服务器进程。为了同样负载分散到多台服务器上,我们需要设置 prefetchCount设置在channel.basicQos。
  • 我们使用访问队列basicConsume。然后我们进入while循环中,我们等待请求消息,做的工作,并发送响应。
的代码我们RPC客户端RPCClient.java :
1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    connection = factory.newConnection();
    channel = connection.createChannel();

    replyQueueName = channel.queueDeclare().getQueue(); 
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {     
    String response = null;
    String corrId = java.util.UUID.randomUUID().toString();

    BasicProperties props = new BasicProperties
                                .Builder()
                                .correlationId(corrId)
                                .replyTo(replyQueueName)
                                .build();

    channel.basicPublish("", requestQueueName, props, message.getBytes());

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(corrId)) {
            response = new String(delivery.getBody());
            break;
        }
    }

    return response; 
}

public void close() throws Exception {
    connection.close();
}
稍微复杂的客户端代码是:
  • 我们建立一个连接通道,并宣布独家'回调'队列回复。
  • 我们订阅'回调'队列,这样我们就可以收到RPC响应。
  • 我们的的呼叫方法使实际的RPC请求。
  • 在这里,我们首先生成一个独特的correlationId 数,并将其保存- while循环将使用这个值来捕捉适当的响应。
  • 接下来,我们请求消息发布,两个属性: REPLYTOcorrelationId
  • 在这一点上,我们可以坐下来,等到适当的响应到达。
  • while循环做一个非常简单的工作,每个响应消息,检查是否correlationId 是我们正在寻找一个。如果是这样,可以节省响应。 
  • 最后,我们返回的响应返回给用户。
使客户端请求:

RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");   
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();
现在是一个很好的时间来看看我们的完整的示例源代码(包括基本的异常处理)为 RPCClient.java RPCServer.java 。
编译并设置classpath中像往常一样(参见教程一 ):
$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java
我们的RPC服务现已准备就绪。我们可以启动服务器:
$ java -cp $CP RPCServer
 [x] Awaiting RPC requests
要要求一个斐波那契数,运行客户端:
$ java -cp $CP RPCClient
 [x] Requesting fib(30)
这里提出的设计是不是唯一可能的执行的RPC服务,但它有一些重要的优点:
  • 如果RPC服务器速度太慢,你可以扩展,只是运行另一个。尝试运行第二RPCServer,在一个新的控制台。
  • RPC需要在客户端上,只有一个消息的发送和接收。没有同步调用 需要像queueDeclare的。因此,RPC客户端只需要一个网络往返为一个单一的RPC请求。
我们的代码仍然是相当简单的,不设法解决更复杂但重要的问题,如:
  • 应该如何反应,如果客户端没有任何服务器上运行?
  • 如果一个客户有某种的RPC超时?
  • 如果服务器出现故障,并提出了一个例外,它应该被转发到客户端?
  • 防止无效的传入消息(如检查突飞猛进,类型),然后再处理。

没有评论:

发表评论