RPC

时间:2017-06-17 19:13:49   收藏:0   阅读:355

RPC

 

  1. 当客户端启动,它创建一个匿名的并且是exclusive的回调queue。

  2. 在一次RPC请求中,客户端发送的消息有两个属性:replyTo,放置的是回调queue的信息。correlationId,放置的是每个请求唯一的值。

  3. 请求被发送到一个rpc_queue中。

  4. RPC服务端在queue的另一端等待请求。当请求到来时,它处理任务并将消息的结果发送回客户端,使用replyTo中设置的queue。

  5. 客户端在回调queue中等待响应的数据,当消息出现时,它先检查correlationId属性。如果匹配的话就将结果返回到应用中。

Callback queue

使用RabbitMQ来进行RPC是非常简单的。客户端发送一个请求到服务端,服务端接收后返回响应的消息。为了接收到响应的消息,我们需要在请求中发送一个callback 的queue地址。我们可以使用默认的queue(在Java的client中它是exclusive的)。

 1 callbackQueueName = channel.queueDeclare().getQueue();
 2 
 3 BasicProperties props = new BasicProperties
 4                             .Builder()
 5                             .replyTo(callbackQueueName)
 6                             .build();
 7 
 8 channel.basicPublish("", "rpc_queue", props, message.getBytes());
 9 
10 // ... then code to read a response message from the callback_queue ...

Message properties

AMQP协议预定义了消息的14种属性。大部分的都很少使用,除了以下这些:

在之前的方法中我们建议为每个RPC请求创建一个回调queue。这显得有点影响性能,幸运的是有一种更好的方式——每个客户端只创建一个回调queue。 但这产生了一个新问题,无法将相应的Response和Request对应起来。这个时候就需要用到correlationId属性。对于每个请求它都将有一个唯一的值。 当我们在回调queue中接收到消息之后,检查该属性,看是否与Request匹配。如果是一个未知的correlationId值,那么我们可以安全的忽略这条消息, 因为它不属于我们的请求。

你也许会问,为什么我们应该忽略回调queue中未知的消息而不是抛出异常?这是因为服务端可能会出现竞争条件。尽管不太常见,但是也有可能RPC server在发送响应后挂了, 并且也没有接收到客户端发送的ack。如果发生了这种情况,RPC server在重启后将会重新处理这个请求。这就是为什么在客户端我们需要优雅的处理重复的响应, RPC应该是幂等的。

 1 package com.rabbitmq.www.publish_subscribe.rpc;
 2 
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.AMQP;
 8 import com.rabbitmq.client.Envelope;
 9 
10 import java.io.IOException;
11 import java.util.UUID;
12 import java.util.concurrent.ArrayBlockingQueue;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.TimeoutException;
15 
16 public class RPCClient {
17 
18   private Connection connection;
19   private Channel channel;
20   private String requestQueueName = "rpc_queue";
21   private String replyQueueName;
22   
23   private final static String HOST_ADDR = "172.18.112.102";
24 
25   public RPCClient() throws IOException, TimeoutException {
26     ConnectionFactory factory = new ConnectionFactory();
27     factory.setHost(HOST_ADDR);
28 
29     connection = factory.newConnection();
30     channel = connection.createChannel();
31 
32     replyQueueName = channel.queueDeclare().getQueue();
33   }
34 
35   public String call(String message) throws IOException, InterruptedException {
36     String corrId = UUID.randomUUID().toString();
37 
38     AMQP.BasicProperties props = new AMQP.BasicProperties
39             .Builder()
40             .correlationId(corrId)
41             .replyTo(replyQueueName)
42             .build();
43     //调用服务端请求(在制定的queue上发布消息)
44     channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
45 
46     final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
47 
48     channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
49       @Override
50       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
51         if (properties.getCorrelationId().equals(corrId)) {
52           response.offer(new String(body, "UTF-8"));
53         }
54       }
55     });
56 
57     return response.take();
58   }
59 
60   public void close() throws IOException {
61     connection.close();
62   }
63 
64   public static void main(String[] argv) {
65     RPCClient fibonacciRpc = null;
66     String response = null;
67     try {
68       fibonacciRpc = new RPCClient();
69 
70       System.out.println(" [x] Requesting fib(30)");
71       response = fibonacciRpc.call("30");
72       System.out.println(" [.] Got ‘" + response + "‘");
73     }
74     catch  (IOException | TimeoutException | InterruptedException e) {
75       e.printStackTrace();
76     }
77     finally {
78       if (fibonacciRpc!= null) {
79         try {
80           fibonacciRpc.close();
81         }
82         catch (IOException _ignore) {}
83       }
84     }
85   }
86 }
package com.rabbitmq.www.publish_subscribe.rpc;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "rpc_queue";
  
  private final static String HOST_ADDR = "172.18.112.102";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(HOST_ADDR);

    Connection connection = null;
    try {
      connection      = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      System.out.println(" [x] Awaiting RPC requests");

      Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                  .Builder()
                  .correlationId(properties.getCorrelationId())
                  .build();

          String response = "";

          try {
            String message = new String(body,"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            response += fib(n);
          }
          catch (RuntimeException e){
            System.out.println(" [.] " + e.toString());
          }
          finally {
            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

            channel.basicAck(envelope.getDeliveryTag(), false);
          }
        }
      };

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

      //loop to prevent reaching finally block
      while(true) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException _ignore) {}
      }
    } catch (IOException | TimeoutException e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null)
        try {
          connection.close();
        } catch (IOException _ignore) {}
    }
  }
}

server端的代码非常直观:

客户端代码有一点点的复杂:

 

原文:http://www.cnblogs.com/woms/p/7040873.html

评论(0
© 2014 bubuko.com 版权所有 - 联系我们:wmxa8@hotmail.com
打开技术之扣,分享程序人生!