漫谈流式计算的一致性

参考,

http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/

http://www.confluent.io/blog/real-time-stream-processing-the-next-step-for-apache-flink/

 

image

对于batch分析,fault-tolerant很容易做,失败只需要replay,就可以完美做到容错。

对于streaming分析, 数据流本身是动态,没有所谓的开始或结束,虽然可以replay buffer的部分数据,但fault-tolerant做起来会复杂的多

当前主流的一些streaming分析平台,都有一些各自特有的fault-tolerant的机制,在此分析和总结一下,

无状态流数据处理,

这是种比较简单的流式数据的场景,典型的应用是数据ETL,数据存储,数据流过是没有状态的

保证at least once语义,
分钟级别,Storm的acker机制,就可以很好的保证, http://storm.apache.org/documentation/Guaranteeing-message-processing.html
message没有被正确处理,收到ack时,可以选择重发,这样每条message对可以保证被处理到,但可能会被重复处理

小时,天级别,利用kafka的replay,一般达到天级别的cache

保证exactly once语义,
对于无状态数据流,其实只要依赖最终存储的去重性(deduplication), 就可以达到exactly once
比如对于数据库,通过unique key和insert ignore就可以解决这个问题,无论你之前重复处理多少次,最终我只存储一次。

如果最终存储不支持去重,或者场景比较复杂不仅仅是存储,比如做叠加计数 或 update
做叠加计数,当前的机制,你无法知道这个message是否加过
做update的时候,更新的时序性很重要,这个是ack机制无法保证的

Storm 0.7就提供transactional topology特性,http://storm.apache.org/documentation/Transactional-topologies.html

首先给message加上transaction id,这样有两个好处,可以保证时序性,在写入存储的时候,可以按transaction id顺序写入
并且在可以外部存储上记录当前最新的transaction id,保证相同的transaction,不会被重复写入
这个是transactional topology的核心思路,这样确实是可以保证强一致性,exactly once语义
但这个方案只适用于无状态,或是依赖外部存储的,状态必须要存储在外部存储上

至于使用batch,或将topology分为processing和commit阶段,都是对性能的优化,并不会提升一致性的保障
但由于使用micro-batch是必须的,所以也称这类方案是micro-batch方案,除了transactional topology,还有Apache Spark Streaming
micro-batch的坏处,
1. 改变编程模型,伪流式
2. windows based聚合的限制,只能是micro-batch的倍数,比如micro-batch是3分钟,你想做个5分钟聚合,没法做
2. 延迟变大,如果本身秒级别,但如果micro-batch是1分钟,那延迟就至少1分钟

有状态流数据处理,

典型的场景,就是windows-based的聚合或计算,比如计算1分钟内的计数或平均值,这样会有部分数据需要cache在内存中
这样当fail-over时,如何可以恢复cache,并保证exactly once语义

最直接的想法,

局部的snapshot

每个component对cache定期做snapshot,然后在fail-over后,各自恢复自己的cache,
这样做的问题,
1. snapshot很难增量做,如果cache比较大,成本会比较高
2. snapshot只能定期做,会有部分丢失
3. 最关键的,对于分布式系统,各个compoent独立的进行snapshot,很难达到同一个状态,每个component的处理速度都是不一样的,有的处理到n做了snapshot,而有的可能做到n+1才做,
缺乏一个统一的参照系。

 

change-log
每个 component,当接收到一个 message 的时候,产生一条 change log 记录该 message 和更新的状态,存入 transactional log 和数据库
当做 fail-over 的时候,只需要每个 component 将数据库中的 log,拿出来 replay 即可
这种方式使用的平台如 Google Cloud Dataflow,Apache Samza

对于 Apache Samza,会将 change log 放入kafka中,

image

当fail-over后,每个task从相应的kafka topic里面读出change-log,完成local state的replay

这样做的好处,是不用直接去snapshot local cache,如果cache比较大的话,这样是比较划算的
但是如果数据流很big的话,这样做也不合适了,因为change-log会非常大

 

Distributed Snapshots (Apache Flink),全局的 snapshot

针对前面提到的局部 snapshot 最关键的问题,提出全局 snapshot 的方法,
其实最大的问题仍然是分布式系统的根本问题,统一参照系的问题,如何让每个 component 在同一的状态下,进行 snapshot

这个原理来自 Chandy and Lamport, 1985,的paper “Distributed Snapshots: Determining Global States of Distributed Systems”

http://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/

局部的snapshot会有的问题,

状态丢失,如下图,但状态中传输的时候,对P和Q进行snapshot,会导致队列中的绿蓝橙状态丢失

image

状态重复,brown状态中P和Q的snapshot里面同时出现

image

怎么解这样的问题?分布式系统中缺乏统一参照系的情况下,只有通过通信才能确定偏序的问题
所以这里使用marker来做组件间的同步,并防止丢失状态,会同时对组件,以及队列同时做snapshot, 如下图

image

P做snapshot,然后发送marker到Q
Q收到marker的时候,知道P做了snapshot,那么我也要做snapshot
同时还要对PQ channel做snapshot,此时channel中有个green,但是由于green是在marker后面的,说明它在P的snapshot里面已经做过,不需要再做,所以此时PQ的snapshot为空
Q在做完snapshot后,还需要把marker返回给P,因为在过程中orange从Q被发送到P
当P收到Q返回的marker时,由于P的snapshot已经做过,无法改变
所以把orange放在QP channel的snapshot中

最终做出的全局的snapshot为,

P(red, green, blue)
channel PQ ()
Q(brown, pink)
channel QP (orange)
这样就解决了状态丢或重复的问题

 

Flink’s distributed snapshotting实现基于stream barriers

image

可见,barrier可以将流拆分成一段段的数据,每个barrier都是一个snapshot点,但是这种拆分不同于micro-batch,并不会影响到正常的流式处理
在DAG,即有向无环图的case下,是不需要对channel做snapshot的,场景会比较简单
只是每个组件收到barrier的时候去做snapshot就好,该算法的几个前提:
1. 网络可靠,消息FIFO;
2. channel可以block,unblock,支持对所有output channel进行广播
3. 可自动识别注入的barrier

完成过程如图,这是个有两条入边的case,相对复杂些
当收到一条channel的barrier时,需要先block该channel,然后等待另一个channel中的barrier
当两条channel的barrier都到达时,说明达到统一状态,进行checkpoint
然后unblock之前block的channel,并对所有的output channel广播该barrier

image

当DAG上的所有组件都完成snapshot时,那么一个全局的snapshot就完成了,以barrier为唯一标识

比较抽象,下图以kafka为例子解释一下,https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html

image

对于kafka而言,不同的partition需要不同的线程读,
图中,4个source thread分别从4个partition读取数据
其中由唯一的master来发起checkpoint流程,
过程是,
1. Master给所有的source thread发checkpoint请求
2. source thread接收到cp请求后,会记录当前的offset,比如5791,并做该offset的message前发出streaming barrier
    并将offset返回给master

3. 这样master收到所有source的ack offset,就相当于对source做了snapshot,恢复时只需要将相应的source置到该offset即可
4. 中间每个组件,当收到所有input channel的barrier时,将cp存入数据库,并通知Master
5. 层层下去,直到所有Sink节点,最终节点,完成snapshot

6. master接收到所有节点的做完cp的ack,知道这次checkpoint全部完成

这个方案的最大的问题是,当多个input channel时,需要等所有的barrier到齐,这个明显会增加latency
Flink的优化是,不等,看到barrier就打snapshot,这样的问题就是无法保证exactly once,会重复,
因为后来的barrier打checkpoint时会覆盖先前的cp,
此时barrier先到的channel已经处理了一些barrier之后的数据,这部分结果也会存在cp中

但当fail-over的时候,因为replay是根据你发送barrier的offset来重发的,所以这部分会重复

原文地址:https://www.cnblogs.com/fxjwind/p/4975389.html