消息队列 (4) 远程调用

前面都是本地服务器,如果需要远程环境运行一个方法,等待结果。这种模式称为远程过程调用或者RPC。

使用Rabbit搭建一个RPC系统,一个客户端和一个扩展的RPC服务器。

Callback queue

  一般做RPC在RabbitMQ中是比较容易的,一个客户端发送一个请求信息和一个响应信息的服务器回复,为了得到一个响应,我们需要发送一个回调队列地址请求。如下:

  String replyQueueName = channel.queueDeclare().getQueue();
  AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(replyQueueName).build();

Message属性:

  AMQP协议一共预定义了14个属性,但是大多数属性很少使用,下面几个比较常用

    deliveryMode:有2个值,一个是持久,另一个表示短暂。

    contentType:内容类型,用来描述编码的MIME类型。例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。

    replyTo:经常使用的是回调队列的名字

    correlationid:RPC响应请求的相关应用

Correlation Id

  在队列上接收到一个响应,但它并不清楚响应属于哪一个,当我们使用CorrelationId属性的时候,我们就可以将它设置为每个请求的唯一值,稍后当我们在回调队列中接收消息的时候,我们会看到这个属性,如果我们看到一个位置的CorrelationId,我们就可以安全地忽略信息-他不属于我们的请求。为什么我们应该忽略位置的消息在回调队列中,而不是失败的错误?这是由于服务端的一个竞争条件可能性。比如还未发送一个确认信息给请求,但是此时RPC服务器挂了。如果在何种情况发生,将再次重启RPC服务器处理请求。这就是为什么在客户端必须处理重复的反应。

需求

我们的RPC工作方式如下:

  1.当客户端启动时,它创建一个匿名的独占回调队列。

  2.对于RPC请求,客户端发送2个属性,一个是replyTo设置回调队列,另一个是correlationId每个队列设置唯一值

  3.请求被发送到一个rpc_queue队列中

  4.rpc服务器是等待队列的请求,当收到一个请求的时候,他就把消息返回的结果返回给客户端,使请求结束。

  5.客户端等待回调队列上的数据,当消息出现的时候,他检查correlationId,如果它和从请求返回的值匹配,就进行相应。

服务端代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {
    private static final String RPC_QUEUE_NAME="rpc_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
        channel.basicQos(1);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME,false,consumer);

        System.out.println("RPCServer 等待 RPC 请求");

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(props.getCorrelationId()).build();

            String message = new String(delivery.getBody(),"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println("RPCServer fib("+message+")");
            String response = "" + fib(n);
            channel.basicPublish("",props.getReplyTo(), replyProps,response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        }
    }
    private static int fib(int n) {
        if (n == 0) {
            return 0;
        }
        if (n == 1) {
            return 1;
        }
        return fib(n - 1) + fib(n - 1);
    }
}

服务器代码实现功能如下:

  1.建立连接,通道,队列

  2.我们可能运行多个服务器进程,为了分散负载服务器压力,我们设置channel.basicQos(1);

  3.我们用basicconsume访问队列,然后进入循环,在其中我们等待请求信息并处理消息然后发送相应。

客户端代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String Call(String message) throws IOException, InterruptedException {
        String response;
        String coorrId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(coorrId).replyTo(replyQueueName).build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(coorrId)) {
                response = new String(delivery.getBody(), "UTF-8");
                break;
            }
        }
        return response;
    }

    public void close() throws IOException {
        connection.close();
    }

    public static void main(String[] args) throws IOException {
        RPCClient rpcClient = null;
        String response;
        try {
            rpcClient = new RPCClient();
            System.out.println("PRCClient Requesting fib(20)");
            response = rpcClient.Call("20");
            System.out.println("RPCClient Got :" + response);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (rpcClient != null) {
                rpcClient.close();
            }
        }
    }
}

客户端实现功能如下:

  1.建立一个连接和通道,并声明了一个唯一的回调队列的答复

  2.我们订阅回调队列,这样就可以得到RPC的相应

  3.定义一个Call方法用于发送当前的回调请求

  4.生成一个唯一的correlationid,然后通过while循环来捕获合适的回应。

  5.我们请求信息,发送2个属性replyTo和correlationId

  6.然后就是等待直到有合适的回应到达

  7.while循环判断每一个响应信息,是否有correlationid然后进行匹配,然后就是进行响应。

  8.最后把响应返回到客户端

原文地址:https://www.cnblogs.com/baidawei/p/9176676.html