RabbitMQ官方文档翻译之Work Queues(二)

Work Queues (using the Java Client)


In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

在第一篇教程里,我们写了一个往队列写消息和从队列读消息的程序。在这一篇教程中,我们将创建一个Work Queue用来在多个workers中分配time-consuming tasks

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

工作队列(又称:任务队列)背后的主要思想是避免在程序中直接执行资源密集型任务,这样会使得程序处于必须等待状态,当任务执行完成程序 才会继续执行。 相反,我们可以继续执行后续的任务而不必等待当前密集型任务的完成,通俗的说法就是异步执行。 在RabbitMQ中我们将任务封装成消息,并将其发送到队列。 在后台运行的工作进程(consumer)将取出队列中任务(message)并最终执行作业(对消息进行相应处理)。 当你运行很多工作人员(consumer)时,这些任务将在它们之间共享。

异步的概念在Web应用程序中特别有用,在短时间HTTP请求窗口中无法处理复杂的任务,把它交由异步线程处理。 

Preparation 准备

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We don't have a real-world task, like images to be resized or pdf files to be rendered, so let's fake it by just pretending we're busy - by using the Thread.sleep() function. We'll take the number of dots in the string as its complexity; every dot will account for one second of "work". For example, a fake task described by Hello... will take three seconds.

在本教程的前面部分,我们发送了一个包含“Hello World!”的消息。 现在我们将发送的字符串代替复杂任务。 因为我们没有一个现实世界的任务,比如调整图像大小,或者是渲染pdf文件,还是用字符串模拟吧:通过字符串中的.的数量来决定计算的复杂度,每个.都会消耗1s,即sleep(1)。

We will slightly modify the Send.Java code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it NewTask.java:

我们将稍微修改我们前面的例子中的Send.java代码,以允许从命令行发送任意消息。 该程序将模拟安排密集型任务到我们的工作队列,所以让我们命名为NewTask.java:

String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Some help to get the message from the command line argument:

从命令行参数获取消息:

private static String getMessage(String[] strings){
    if (strings.length < 1)
        return "Hello World!";
    return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
    int length = strings.length;
    if (length == 0) return "";
    StringBuilder words = new StringBuilder(strings[0]);
    for (int i = 1; i < length; i++) {
        words.append(delimiter).append(strings[i]);
    }
    return words.toString();
}

Our old Recv.java program also requires some changes: it needs to fake a second of work for every dot in the message body. It will handle delivered messages and perform the task, so let's call it Worker.java:

我们之前的Recv.java程序也需要进行一些更改:它将处理传递的消息并利用Thread.sleep()来模拟执行密集型任务,休眠的时间 按message body中的.的个数来累积,我们把它改名为Worker.java:

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

Our fake task to simulate execution time:

我们假设的模拟执行任务的时间:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}    

Compile them as in tutorial one (with the jar files in the working directory and the environment variable CP):

按照教程一在工作目录中的jar文件和环境变量CP中编译

javac -cp $CP NewTask.java Worker.java

Round-robin dispatching

循环分发

One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.

RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果现在Task加重,那么只需要创建更多的Consumer来进行任务处理即可。当然了,对于负载还要加大怎么办?我没有遇到过这种情况,那就可以创建多个virtual Host,细化不同的通信类别了。

First, let's try to run two worker instances at the same time. They will both get messages from the queue, but how exactly? Let's see.

首先,我们尝试在同一时间运行两个Consumer实例。 他们都会从队列中获取消息,但是究竟如何工作? 让我们来看看。

You need three consoles open. Two will run the worker program. These consoles will be our two consumers - C1 and C2.

   首先开启两个Consumer,即运行两个worker.class

# shell 1
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C

In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:

Producer new_task.class要Publish Message了:

# shell 3
java -cp $CP NewTask
# => First message.
java -cp $CP NewTask
# => Second message..
java -cp $CP NewTask
# => Third message...
java -cp $CP NewTask
# => Fourth message....
java -cp $CP NewTask
# => Fifth message.....

Let's see what is delivered to our workers:

注意一下:.代表的sleep(1)。接下来看两个Consumers分别收到了什么消息

java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
java -cp $CP Worker
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.

默认情况下,RabbitMQ 会顺序的分发每个Message。平均每个消费者将获得相同数量的消息。 这种分发消息的方式叫做循环(round-robin)。 

这里的默认情况是指开启了自动确认的机制。consumer每当收到消息就发确认ack,队列收到ack后,会把消息标记为已完成,然后将该Message 从队列中删除,然后将下一个Message分发到下一个Consumer

这种分发还有问题,接着向下读吧。

Message acknowledgment 消息确认

标题消息确认应该理解为手动消息确认,更好理解,也就是只有consumer接收消息并处理完成才发送ACK确认。和下面的例子形成对比。

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

  每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。  

这里我们可以理解为采用RabbitMQ采用了自动确认的方式。每次Consumer接到数据后,而不管是否处理完成都会向发ack,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。


But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

 假设一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。那我们要怎么做

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

 为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。如果consumer接受到消息并且处理完成,它会发送一个ack给RabbitMQ,告诉RabbitMQ可以安全的删除这个消息。

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

如果一个consumer die(由于channel关闭,connection关闭或者TCP连接失效),并且销毁之前没有发送ack确认,RabbitMQ将认为这个消息没有处理完,就会把消息从新放回到队列中。此时还有其他的consumer存在的话,RabbitMQ会立刻把消息传送给其他的consumer。这种方式就能确保没有消息丢失,即使有consumer中途die

There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

RabbitMQ中的消息没有超时机制; 当consumer die 时,RabbitMQ将传递消息给其他的consumer。 这样即使consumer处理消息需要非常长的时间也是可以的

Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the autoAck=true flag. It's time to set this flag to false and send a proper acknowledgment from the worker, once we're done with a task.

消息确认默认情况下打开。

 在以前的例子中,我们通过autoAck = true明确地将消息确认机制关闭。

 现在将autoAck = false,只有consumer处理完消息,才向RabbitMQ server发送正确的确认ACK

channel.basicQos(1); // 一次只接受一封Unack-ed消息(见下文)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.

这样即使你通过Ctr+C中断了一个consumer,message也不会丢失了,它会被分发到下一个Consumer。

忘记通过basicAck()发送确认是一个很常见的错误。这个错误会产生很严重的后果,如果consumer中断连接,消息将会再次传送给其他的consumer,但是你有没有想到RabbitMQ会因为这些没有被ACK的消息浪费掉宝贵的内存,如果这种错误积累到一定的程度,系统可能会因为内存泄露面临奔溃

Message durability  消息的持久化

We have learned how to make sure that even if the consumer dies, the task isn't lost. But our tasks will still be lost if RabbitMQ server stops.

    在上一节中我们知道了通过关闭auto-ack来确保Consumer die,Message也不会丢失。

    但是如果RabbitMQ Server停止,消息仍然可能丢失。

When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren't lost: we need to mark both the queue and messages as durable.

当RabbitMQ停止或者发生错误,它将丢失queue和message。

我们可以做两件事来确保它们不丢失:1、持久化队列 2、持久化消息

First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:

首先,我们需要确保RabbitMQ不会失去队列。 为了这样做,我们需要将其声明为持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

Although this command is correct by itself, it won't work in our present setup. That's because we've already defined a queue called hello which is not durable. RabbitMQ doesn't allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let's declare a queue with different name, for example task_queue:

注意:上述queueClare()命令本身是正确的,但是在我们目前的设置中是不行的。 这是因为我们已经定义了一个不耐用的名为hello的队列。 RabbitMQ不允许重新定义具有不同参数的现有队列,并会向尝试执行此操作的程序抛出异常。 有一个快速的解决方法 - 让我们用不同的名称声明一个队列,例如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

This queueDeclare change needs to be applied to both the producer and consumer code.

如果producer和consumer都queueDeclare(),现在参数做了更改,producer和consumer中的代码都要更改,否则就会像上面所说的发生错误。

At this point we're sure that the task_queue queue won't be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by setting MessageProperties (which implements BasicProperties) to the value PERSISTENT_TEXT_PLAIN.

完成上述操作,我们确信,即使RabbitMQ重新启动,task_queue队列也不会丢失。 现在我们需要为我们的消息做持久化

做法:设置消息的属性为MessageProperties、PERSISTENT_TEXT_PLAIN    (更多的消息属性可以看MessageProperties类)

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
  通过上述的操作将消息标记为持久性,就能完全保证消息不会丢失吗?                                                                                                                     答案是否定的。                                                                                                                                                                                                 1.因为当RabbitMQ接收消息到保存消息的过程需要时间,在这个时间窗口中RabbitMQ停止工作,消息仍旧可能丢失                                         2.此外RabbitMQ不会对每个消息执行fsync(fsnyc就是同步内存中修改的数据到磁盘)                                                                                     3.它可能只是把消息保存在缓存中,而不是磁盘中


Fair dispatch 公平分发

You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

您可能已经注意到,分发有时候不是非常的满意。 例如有两个consumer,队列中有些消息很heavy,有些消息很light,但是RabbitMQh还是按顺序不断的发送消息给consumer。这样可能会有一个consumer不断的分配到heavy的message而不断busy,另一个consumer却分配到的都是light的message而被idle。 那么,RabbitMQ不知道什么,还会平均分配消息。但RabbitMQ并不清楚消息的heavy和light,还是会平均的分配消息,这样公平分发有时显的并不是十分合理!

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

这是因为当消息进入队列时,RabbitMQ只会分派消息。 它不会看消费者的未确认消息的数量。 它只是盲目地向第n个消费者发送每个第n个消息。

这样就会造成一些Consumer工作比较重,而有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

In order to defeat that we can use the basicQos method with the prefetchCount = 1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

为了解决这个问题我们可以使用basicQos方法设置prefetchCount = 1。 这告诉RabbitMQ不要一次给一个工作者多个消息,换句话说,在consumer处理并确认前一个消息之前,不要向consumer发送新消息。 相反,它将把message发送到下一个还idle的consumer

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小,如果所有的consumer都处于busy状态,此时队列还在不断接受Message,队列可能会填满。 你应该要留意,增加更多的consumer或者添加其他的策略。

Putting it all together  把他们合在一起

Final code of our NewTask.java class:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

(NewTask.java source)

And our Worker.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);//设置auto-ack为false,关闭自动确认
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

(Worker.java source)

Using message acknowledgments and prefetchCount you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.

For more information on Channel methods and MessageProperties, you can browse the javadocs online.

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.


原文地址:https://www.cnblogs.com/chenny3/p/10226167.html