Rabbitmq消息队列(三) 工作队列

1、简介

  默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

  工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

 2、消息确认

  为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

  消息响应默认是开启的。之前的例子中我们可以使用autoAck=True标识把它关闭。

 1 channel.basicQos(1);        // 同一时间只接受1条消息,直到处理了1条消息并作出响应
 2 final Consumer consumer = new DefaultConsumer(channel) {
 3     @Override
 4     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 5         String message = new String(body, "UTF-8");
 6         try {
 7             System.out.println(message);
 8             Thread.sleep(10000);
 9         } catch (Exception e) {
10         
11         } finally {
12             channel.basicAck(envelope.getDeliveryTag(), false);         // 不要忘了消息确认,否则内存会约占越多
13         }
14     }
15 };
16 channel.basicConsume(queueName, false, consumer);

  

3、消息持久化

  如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

  注意:如果已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个名为hello的持久化队列,必须重新进行命名

  (1)将队列声明为持久化(生产者和消费者代码中都得进行调整)

1 boolean durable = true;
2 channel.queueDeclare(queueName, durable, false, false, null);

  (2)生产者端发送消息时,将消息进行持久化

1 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
原文地址:https://www.cnblogs.com/origalom/p/8334197.html