Storm保证消息处理

Guaranteeing Message Processing

Storm保证每一个tuple被完全处理。Strom中一个核心的机制是它提供了一种跟踪tuple血统的能力,它使用了一种十分有效的方式跟踪topology中的tuple。

Storm中最基本的抽象是提供了至少一次(at-least-once)处理的保证,当你使用队列系统的时候也可以提供相同的保证。

Messages are only replayed when there are failures.(消息只有在失败的时候才会被重新投放)

Trident是在基本抽象之上的更高层面的抽象,它可以实现精准的只执行一次的处理。

对于保证消息处理,Storm提供了几种不同的级别,包括尽最大努力、至少一次处理、精确的一次处理。( best effort、at least once、exactly once)

一个消息被“完全处理”是什么意思?

从一个spout出来的一个tuple可能触发上千个tuples。考虑下面的例子:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com",
                                               22133,
                                               "sentence_queue",
                                               new StringScheme()));
builder.setBolt("split", new SplitSentence(), 10)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 20)
        .fieldsGrouping("split", new Fields("word"));

这个topology读取从一个Kestrel队列出来的句子,并且将句子分割成单词,然后发送每个单词已经它们出现的次数。从一个spout出来的一个tuple可能触发上千个tuples在这里的意思是:每个单词都是一个tuple,并且每个每个单词都要更新它们出现的次数。这个例子中,消息树可能看起来是这样的:

当这个tuple树被耗尽并且树中每个消息都已经被处理了,那么这个时候这个从一个spout出来的一个tuple被称之为“完全处理”。当一个消息树中的消息在指定的超时时间内没有被完全处理,则认为这个tuple失败了。默认的超时时间是30秒。

如果一个消息被完全处理或者失败了将会发生什么?

为了理解这个问题,让我们看一下从spout处理的一个tuple的生命周期。(let's take a look at the lifecycle of a tuple coming off of a spout)

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,Storm通过在Spout中调用nextTuple方法从Spout中请求一个tuple。这个Spout使用open方法中提供的SpoutOutputCollector来发送一个tuple到其中一个streams。在发送一个tuple的时候,这个Spout提供一个“message id”用来表示这个tuple。发送消息的代码可能看起来是这样的:

_collector.emit(new Values("field1", "field2", 3) , msgId);

接下来,这个tuple达到bolt,并且Storm对于跟踪这个tuple的消息树很关心。如果Storm发现一个tuple被完全处理,那么Storm将会调用原始的那个带有message id的Spout中的ack方法。同样的,如果一个tuple超时,那么Storm将会调用那个Spoutfail方法。注意,一个tuple只能被创建它的那个Spout任务所acked或者failed。

让我们用KestrelSpout再回顾一遍Spout是怎样保证消息处理的。当KestrelSpout从Kestrel队列中消费了一个消息以后,它“opens”这个消息。意思是这个消息并没有实际的从队列中删除,代替的只是标记这个消息为“pending”状态,等待消息被完成以后的确认。处于pending状态的消息不会被发送给队列的其它消费者。另外,如果一个客户端连接断开了,那么它所持有的所有pending状态的消息将会被重新放回队列。当一个消息被打开的时候,Kestrel提供这个消息数据给客户端,并且给这个消息一个唯一的id。当发送tuple给SpoutOutputCollector的时候KestrelSpout用这个精确的id作为这个tuple的“message id”。随后,当这个KestrelSpout上的ack或者fail被调用的时候,这个KestrelSpout发送ack或者fail消息给Kestrel,同时还带上message id,以决定是从队列中删除这个消息还是将这个消息重新放回队列。

Storm的可靠性API是什么?

想要使用Storm的可靠性,有两件事情是你必须做的。第一,无论何时你在为一个tuple树创建一个新的链接的时候你必须告诉Storm。第二,你必须告诉Storm什么时候一个单个的tuple算是完成处理了。通过做以上两件事情,Storm就可以发现什么时候这个tuple的树是被完全处理了,并且在适当的时候ack或者fail这个tuple。Storm的API提供了一些简洁的方式来做上面说的这些事情。

Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple.

让我们用下面这个bolt作为例子来具体看一下:

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

Each word tuple is anchored by specifying the input tuple as the first argument to emit. Since the word tuple is anchored, the spout tuple at the root of the tree will be replayed later on if the word tuple failed to be processed downstream.(这段话的意思是,调用emit方法的时候指定它的第一个参数tuple的时候,这个被指定的tuple就被anchored了,一旦这个tuple被anchored以后它将作为它的消息tree的root,如果这个tuple在随后处理失败了,这个tuple会被重新投放)

与之相对的,我们再看一下下面这种发送方式:

_collector.emit(new Values(word));

这种方式会造成这个unanchored,也就是说,用这种方式的话tuple就不能被anchored。如果这个tuple在下游被处理失败了,那么根tuple不会被重新投放。根据你的系统的容错性要求来决定,有时候用一个unanchored的tuple也很合适。

一个输出tuple可以被anchored到多个输入tuple。在做stream的join或者聚集很有用。一个multi-anchored的tuple失败会造成多个tuple被重放。例如:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

DAG和tree是一回事,只是在先前的版本中叫tree。

当你已经完成了对tuple树中某一个tuple的处理的时候,你可以调用OutputCollectorack或者fail方法。

你可以在OutputCollector中用fail方法来立即失败这个tuple。通过明确失败这个tuple,这个tuple可以被快速重放而不是等待这个tuple超时以后再重放。

你处理的每一个tuple都必须被acked或者failed。Storm用内存来跟踪每一个tuple,因此如果你不ack/fail每一个tuple,那么这个任务将会一直存在于内存中。

大量的bolts有相同的行为模式:它们读取一个输入tuple,然后基于这个tuple发送tuple,在execute方法的最后ack这个tuple。Storm有一个叫做BasicBolt的接口已经为你封装好了这种模式。下面这个例子就是:

public class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
 }

这种实现方式比前面的例子中那种方式更简单,而且表达的语义完全相同。Tuple被发送给BasicOutputCollector的时候自动被anchored,并且输入tuple在execute方法执行完以后自动被acked。

考虑到tuple会被重放,我应该怎么做才能使我的应用正常工作?

视情况而定

Storm如何有效地实现可靠性?

一个Storm topology有一组特殊的“acker”任务跟踪每一个spout tuple的DAG(有向无环图)。当一个acker看到一个DAG被完成的时候,它一个消息给那个创建这个spout tuple的spout task来确认这个消息。你可以设置一个topology的acker任务的数量,默认是每个woker一个这样的task。

理解Storm的可靠性实现最好的方式就是看tuples的生命周期和tuple的DAG。当一个tuple在topology中被创建的时候,无论是在spout中还是在bolt中被创建,都会被指定一个64位的id。这些id被acker用来跟踪每个spout tuple的DAG。

当你在一个bolt中发送一个新的tuple的时候,已经被ahchored过的那些tuple的id会被复制到新的tuple中,因此,每一个tuple都知道它所在的tuple树中所有spout tuples的id。当tuple被确认的时候,它给acker任务发一个消息告诉acker这个tuple树怎样变化的。特别的,它会告诉acker“我现在已经这个spout tuple的树中完成了,这有一个新tuple被anchored”。

现在,你已经理解了可靠性的算法,让我们来复习一下所有失败的情况,并且看一下对于每一中情况Strom是如何避免数据丢失的:

  • A tuple isn't acked because the task died:这种情况下,失败的那个tuple的tuple树的根tuple将会超时并且重放
  • Acker task dies:这种情况下,这个acker跟踪的所有的spout tuple都将超时并且重放
  • Spout task dies:这种情况下,由spout的源来负责消息的重放。例如:队列想Kestrel和RabbitMQ在客户端连接断开的时候将所有pending状态的消息重新放回队列。

本节重点

1、一个tuple的tuple树被耗尽并且树中的每个消息都被处理,则称这个tuple被完全处理。反之,没有被完全处理则认为这个tuple失败。默认处理超时时间是30秒。

2、Strom通过调用Spot中的nextTuple方法请求一个tuple。在Spout中发送一个tuple的时候,Spout提供一个"message id"用于标识这个tuple。

3、如果Storm发现一个tuple被完成处理,那么将会在原始的那个Spout中调用ack方法,同时将message id作为参数。同样的,如果tuple失败,则调用那个Spout中的fail方法。注意,tuple被acked或者failed都是在创建这个tuple的那同一个Spout中调用。

4、Specifying a link in the tuple tree is called anchoring. Anchoring is done at the same time you emit a new tuple. 

5、在Bolt中将特定的输入tuple作为OutputCollector的emit方法第一个参数,那么这个输出Bolt是被anchored的,因而如果这个输出tuple在下游被处理失败的话,这个tuple所在的tuple树的根tuple将会被重放。否则,如果没有将输入tuple作为第一个参数的话,则是unanchored,进而失败的时候也不会被重放。

6、Storm topology有专门的task来跟踪每一个spout tuple的DAG,这些特殊的task被叫做"acker"。默认每个worker一个acker。当acker发现一个DAG完成的时候,它发消息给创建这个spout tuple的spout task以此来确认这个消息。

7、在topology中,当一个tuple被创建的时候,都会被指定一个64位的id。acker用这个id来跟踪每个tuple。

8、由于tuple可能会重放,因此在处理的时候需要根据message id去重

进一步精简

每一个从Spout出来的tuple都带着一个"message id",重放的时候会用到这个id。

每一个tuple在创建的时候都被指定了一个64位的id,在acker跟踪tuple的DAG(也叫tuple tree)会用到。

每一个tuple被acked的时候,它会发一个消息告诉acker这个tuple tree发生了怎样的改变

一个tuple从Spout出来,随后会经历很多个Bolt,在一步的Bolt中都可能产生新的tuple。因此,由这个原始的tuple开始将会衍生出很多tuple,这些tuple构成一个tuple tree(也叫tuple DAG)。只有当tree耗尽并且其中所有的tuple都处理完成,这个tuple才叫完全处理,否则tuple失败。tuple完全处理后将会在原始那个Spout上调用ack方法,并且用message id作为参数,tuple失败以后同样在那个Spout上调用fail方法。当tuple tree中某一个tuple完成处理以后可以调用OutputCollector的ack方法确认,这个tuple便会从DAG中删除。当acker发现DAG完成以后,发消息告诉那个Spout task来确认这个消息。

参考 http://storm.apache.org/releases/current/Guaranteeing-message-processing.html

原文地址:https://www.cnblogs.com/cjsblog/p/8400167.html