RabbitMQ(三)

   这节我们主要讲RabbitMQ的分发,由生产者发布一个任务,多个接受者去获取任务来进行加工处理。

下面介绍任务分发

  

  一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。

  我们创建一个新的生产者NewTask

package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        
        //分发消息
        for(int i=0; i<10; i++){
            String message = "Hello RabbitMQ" + i;
            //MessageProperties.PERSISTENT_TEXT_PLAIN设置持久化
            channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("NewTask send :" + message);
        }
        channel.close();
        connection.close();
    }
}

  然后我们创建一个Work1和Work2去接收任务,其中Work1和Work2代码一样。

package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;

public class Work1 {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Worker1  Waiting for messages");
        
        //每次从队列获取的数量
        channel.basicQos(1);
        
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException{
                  String message = new String(body, "UTF-8");
                  System.out.println("Worker1 Received:" + message);
                  try{
                      Thread.sleep(1000); // 暂停1秒钟
                  }catch(Exception e){
                      //此操作中的所有异常将被丢弃
                      channel.abort();
                  }finally{
                      System.out.println("Worker1 Done");
                      //消息处理完成后手工确认,即下面的basicConsume第二个参数要为false。
                      channel.basicAck(envelope.getDeliveryTag(), false);
                  }
            }
        };
        boolean autoAck = false;
        //消息完成确认
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
}
package com.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Work2 {
    private static final String TASK_QUEUE_NAME = "task_queue";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
        System.out.println("Worker2  Waiting for messages");
        
        //每次从队列获取的数量
        channel.basicQos(1);
        
        final Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException{
                  String message = new String(body, "UTF-8");
                  System.out.println("Worker2 Received:" + message);
                  try{
                      Thread.sleep(1000); // 暂停1秒钟
                  }catch(Exception e){
                      //此操作中的所有异常将被丢弃
                      channel.abort();
                  }finally{
                      System.out.println("Worker2 Done");
                      //消息处理完成后手工确认,即下面的basicConsume第二个参数要为false。
                      channel.basicAck(envelope.getDeliveryTag(), false);
                  }
            }
        };
        boolean autoAck = false;
        //消息完成确认
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
    }
}

运行结果:

  NewTask:

  

  Work1:

  

  Work2:

  

*注1(重要):

  channel.basicQos(1);保证一次只分发一个,默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来啊?

  为了解决这个问题,我们可以使用channel.basicQos(1)这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。

*注2(重要):

  autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。因此一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息。如果忘记了向服务器确认处理完消息的话,队列中的信息会一直存在。比如将NewTask运行一次,Work1中注释掉channel.basicAck(envelope.getDeliveryTag(), false); 并且一次获取10条信息,那么每运行一次Work1都会收到队列task_queue的消息。

  因此忘记确认 忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。 

*注3(重要):

  但是除了设置ack手动回复以外,还是不够的,如果RabbitMQ-Server突然挂掉了,那么还没有被读取的消息还是会丢失 ,所以我们可以让消息持久化。 只需要在定义Queue时,设置持久化消息就可以了,方法如下:

boolean durable = true;
channel.queueDeclare(channelName, durable, false, false, null);

  这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起的数据丢失了。但是服务器如果刚收到消息,还没来得及写入到硬盘,就挂掉了,这样还是无法避免消息的丢失。
因为RabbitMQ不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么你可以考虑使用生产者确认反馈机制。 
注意,服务器重启后这条队列很有可能会报错,因为已经定义的队列,再次定义是无效的,这就是幂次原理。RabbitMQ不允许重新定义一个已有的队列信息,也就是说不允许修改已经存在的队列的参数。如果你非要这样做,只会返回异常。
因此一个快速有效的方法就是重新声明另一个名称的队列,不过这需要修改生产者和消费者的代码,所以,在开发时,最好是将队列名称放到配置文件中。这时,即使RabbitMQ服务器重启,新队列中的消息也不会丢失。

总结:

  1:不要一次将多个消息给一个消费者,采用负载均衡。

  2:channel.basicConsume()里的ack参数。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将该参数设置为true,则RabbimtMQ会为下一个AMQP请求添加一个ack属性,告诉AMQP服务器需要等待回馈。否者,不要等待回馈。大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用 channel.basicAck(deliveryTag, multiple)方法。

  3:持久化队列。并最好将队列名称写在配置文件中。

作者:哀&RT
出处:博客园哀&RT的技术博客--http://www.cnblogs.com/Tony-Anne/
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/Tony-Anne/p/6429751.html