远程过程调用(RPC)
(使用Java客户端)
但是,如果我们需要在远程计算机上运行的功能,并等待结果?嗯,这是一个不同的故事。这种模式通常被称为远程过程调用RPC。
在本教程中,我们要使用RabbitMQ的构建RPC系统:一个客户端和一个可扩展的RPC服务器。正如我们没有任何耗时的任务,是值得分发,我们将创建一个虚拟的返回斐波那契数列的RPC服务。
客户端界面
为了说明RPC服务可以被我们用来创建一个简单的客户端类。要揭露命名的方法调用 发送RPC请求和阻塞,直到接收到应答:
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
RPC的注意事项
虽然RPC是一种很常见的计算模式,它经常被批评。出现问题,当程序员不知道是否是本地的还是一个缓慢的RPC调用一个函数。困惑一样,在一个不可预知的系统,并增加了不必要的复杂性调试。简化了软件,而是误用RPC可以导致难以维护的意大利面条式代码。
铭记这一点,请考虑以下建议:
- 确保函数调用哪个是本地的,哪个是远程的。
- 记录您的系统。组件之间的依赖关系清晰。
- 处理错误的案件。客户端应该如何反应时,RPC服务器是很长一段时间?
当有疑问时避免RPC。如果可以的话,你应该使用异步管道 - 而不是类RPC阻塞,结果被异步推到下一个计算阶段。
回调队列
通常通过RabbitMQ做远程过程调用是很容易的。客户端发送一个请求消息和服务器回复一个响应消息。为了收到一个响应,我们需要发送一个'回调'的请求的队列地址。我们可以使用默认队列(这是在Java客户端独有的)。让我们试试吧:
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息属性
AMQP协议预先定义一组14项属性去发送一条消息。但是大部分的属性很少使用,与下列情况除外:
- deliveryMode:将邮件标记为持久(价值2)或瞬态(任何其他值)。从第二个教程, 你也许还记得这个属性。
- 的contentType:用于描述MIME类型的编码。例如经常使用的JSON编码,它是一个很好的做法,将此属性设置为:应用程序/ json。
- REPLYTO:通 常用来命名一个回调队列。
- correlationId:有用的相关RPC请求的响应。
import com.rabbitmq.client.AMQP.BasicProperties;
相关ID
在上述方法中,我们建议创建一个回调队列中的每一个RPC请求。这是非常低效的,但幸运的是有一个更好的方法 - 让每个客户端创建一个单一的回调队列。
这就提出了一个新的问题,收到的响应队列中,它不是明确的请求的响应属于。当的 correlationId属性。我们打算将它设置为一个独特的价值为每一个请求。后来,当我们收到一条消息,在回调队列,我们来看看在这个属性,的基础上,我们将能够匹配的请求的响应。如果我们看到一个未知的 correlationId价值,我们可以安全地丢弃这个消息-它不属于我们的要求。
你可能会问,我们为什么要忽略未知消息在回调队列,而不是失败,错误?这是由于在服务器端的竞争条件的可能性。尽管可能性不大,但它是可能的RPC服务器会死,只是给我们答案后,但在此之前请求确认消息发送。如果发生这种情况,重新启动RPC服务器将处理该请求。这就是为什么在客户端上,我们必须处理的重复响应摆好,和RPC的理想应该是幂等的。
总结
我们的RPC将是这样的:
- 当客户端启动时,它创建了一个匿名的专用回调队列。
- 对于一个RPC请求,客户端发送一个消息,有两个属性: REPLYTO,它被设置为回调队列correlationId的,这是设置一个独特的价值为每一个请求。
- 该请求被发送到一个rpc_queue队列。
- RPC工人(又名:服务器)正在等待该队列的请求。当一个请求出现时,它的工作的结果返回到客户端发送消息,使用队列从REPLYTO领域。
- 客户端等待回调队列中的数据。当消息出现时,它会检查财产correlationId。如果从请求的值匹配时,它返回到应用程序的响应。
全部放在一起
The Fibonacci task:
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}
我们声明斐波纳契功能。它假定只有有效的正整数输入。(不要指望这一个大的数字,它可能是最慢的递归实现)。
为RPC服务器RPCServer.java的 代码看起来像这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
|
服务器代码相当简单:
- 像往常一样,我们开始建立连接,通道和申报队列。
- 我们可能需要运行多个服务器进程。为了同样负载分散到多台服务器上,我们需要设置 prefetchCount设置在channel.basicQos。
- 我们使用访问队列basicConsume。然后我们进入while循环中,我们等待请求消息,做的工作,并发送响应。
的代码我们RPC客户端RPCClient.java :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
|
稍微复杂的客户端代码是:
- 我们建立一个连接通道,并宣布独家'回调'队列回复。
- 我们订阅'回调'队列,这样我们就可以收到RPC响应。
- 我们的的呼叫方法使实际的RPC请求。
- 在这里,我们首先生成一个独特的correlationId 数,并将其保存- while循环将使用这个值来捕捉适当的响应。
- 接下来,我们请求消息发布,两个属性: REPLYTO和correlationId。
- 在这一点上,我们可以坐下来,等到适当的响应到达。
- while循环做一个非常简单的工作,每个响应消息,检查是否correlationId 是我们正在寻找一个。如果是这样,可以节省响应。
- 最后,我们返回的响应返回给用户。
使客户端请求:
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
现在是一个很好的时间来看看我们的完整的示例源代码(包括基本的异常处理)为 RPCClient.java RPCServer.java 。
编译并设置classpath中像往常一样(参见教程一 ):
$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java
我们的RPC服务现已准备就绪。我们可以启动服务器:$ java -cp $CP RPCServer [x] Awaiting RPC requests要要求一个斐波那契数,运行客户端:
$ java -cp $CP RPCClient [x] Requesting fib(30)
这里提出的设计是不是唯一可能的执行的RPC服务,但它有一些重要的优点:
- 如果RPC服务器速度太慢,你可以扩展,只是运行另一个。尝试运行第二RPCServer,在一个新的控制台。
- RPC需要在客户端上,只有一个消息的发送和接收。没有同步调用 需要像queueDeclare的。因此,RPC客户端只需要一个网络往返为一个单一的RPC请求。
我们的代码仍然是相当简单的,不设法解决更复杂但重要的问题,如:
- 应该如何反应,如果客户端没有任何服务器上运行?
- 如果一个客户有某种的RPC超时?
- 如果服务器出现故障,并提出了一个例外,它应该被转发到客户端?
- 防止无效的传入消息(如检查突飞猛进,类型),然后再处理。
没有评论:
发表评论