storm学习笔记

概述

 公司之前使用了strom框架来进行实时计算,现在总结一下之前的知识和经验,如有不足之处,望广大网友及时指正,不胜感激。

简而言之:Storm是一个分布式的,可靠的,容错的数据流处理系统。Storm集群的输入流由一个被称作spout的组件管理,spout把数据传递给bolt, bolt要么把数据保存到某种存储器,要么把数据传递给其它的bolt。一个Storm集群就是在一连串的bolt之间转换spout传过来的数据。

STORM组件

在Storm集群中,有两类节点:主节点master node和工作节点worker nodes。主节点运行 Nimbus守护进程,这个守护进程负责在集群中分发代码,为工作节点分配任务,并监控故障。Supervisor守护进程作为拓扑的一部分运行在工作节 点上。一个Storm拓扑结构在不同的机器上运行着众多的工作节点。每个工作节点都是topology中一个子集的实现。而Nimbus和 Supervisor之间的协调则通过Zookeeper系统或者集群。

zookeeper:

Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。 topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。

spout:

Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组, 数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发 射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。

bolt:

Topology中所有的处理都由Bolt完成。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个 Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射 成多个流,这些流都可以通过declareStream()来声明。

Stream Groupings:

Stream Grouping定义了一个流在Bolt任务中如何被切分。

1. Shuffle grouping:随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。

2.Fields grouping:根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。

3. Partial Key grouping:根据指定字段分割数据流,并分组。类似Fields grouping。

4.All grouping:tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

5. Global grouping:全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。

6. None grouping:无需关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。

7. Direct grouping:这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

8. Local or shuffle grouping:如果目标bolt有一个或多个任务在同一工作进程,tuples 会打乱这些进程内的任务。否则,这就像一个正常的 Shuffle grouping。

STORM原理

Storm称用户的一个作业为Topology(拓扑),为什么叫拓扑呢?是因为Storm的一个拓扑主要包含了许多的数据节点,还有一些计算节点,以及这些节点之间的边,也就是说Storm的拓扑是由这些点和边组成的一个有向无环图。这些点有两种:数据源节点(Spout)、普通的计算节点(Bolt),点之间的边称为数据流(Stream),数据流中的每一条记录称为Tuple。

如下图中,每一个“水龙头”表示一个Spout,它会发送一些Tuple给下游的Bolt,这些Bolt经过处理周,再发送一个Tuple给下一个Bolt,最后,在这些Bolt里面是可以执行一些写数据到外部存储(如数据库)等操作的。在图中这个Topology里面我们看到了两个Spout和5个Bolt,在实际运行的时候,每个Spout节点都可能有很多个实例,每个Bolt也有可能有很多个实例。就像MapReduce一样,一个Map节点并不代表只有一个并发,而有可能很多个Map实例在跑。

这些Spout和Bolt的这些边里面,用户可以设置多种的Grouping的方式。有些类似SQL中的Group By。用来制定这些计算是怎么分组的。

 *Fields Grouping:保证同样的字段移动落到同一个Bolt里,

Topologies

为了在storm上面做实时计算, 你要去建立一些topologies。一个topology就是一个计算节点所组成的图。Topology里面的每个处理节点都包含处理逻辑, 而节点之间的连接则表示数据流动的方向。运行一个Topology是很简单的。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令。strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2这个命令会运行主类: backtype.strom.MyTopology,参数是arg1, arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到nimbus并且上传jar文件。

stream

Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。

storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。比如: 你可以把一个tweets流传输到热门话题的流。

storm提供的最基本的处理stream的原语是spout和bolt。你可以实现Spout和Bolt对应的接口以处理你的应用的逻辑。

spout是流的源头。比如一个spout可能从Kestrel队列里面读取消息并且把这些消息发射成一个流。

又比如一个spout可以调用twitter的一个api并且把返回的tweets发射成一个流。

通常Spout会从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。

Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。

bolt可以接收任意多个输入stream, 作一些处理, 有些bolt可能还会发射一些新的stream。

一些复杂的流转换, 比如从一些tweet里面计算出热门话题, 需要多个步骤, 从而也就需要多个bolt。

Bolt可以做任何事情: 运行函数,过滤tuple,做一些聚合,做一些合并以及访问数据库等等。

Bolt处理输入的Stream,并产生新的输出Stream。

Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。

Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。

 

spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(类似 Job), 你可以把topology提交给storm的集群来运行。

topology的结构在Topology那一段已经说过了,这里就不再赘述了。

topology里面的每一个节点都是并行运行的。 在你的topology里面, 你可以指定每个节点的并行度, storm则会在集群里面分配那么多线程来同时计算。

一个topology会一直运行直到你显式停止它。storm自动重新分配一些运行失败的任务, 并且storm保证你不会有数据丢失, 即使在一些机器意外停机并且消息被丢掉的情况下。

数据模型(Data Model)

storm使用tuple来作为它的数据模型。每个tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型,
在我的理解里面一个tuple可以看作一个没有方法的java对象(或者是一个表的字段)。
总体来看,storm支持所有的基本类型、字符串以及字节数组作为tuple的值类型。你也可以使用你自己定义的类型来作为值类型, 只要你实现对应的序列化器(serializer)。
一个Tuple代表数据流中的一个基本的处理单元,例如一条cookie日志,它可以包含多个Field,每个Field表示一个属性。

Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序填入各个Value,所以就是一个Value List。

一个没有边界的、源源不断的、连续的Tuple序列就组成了Stream。

 topology里面的每个节点必须定义它要发射的tuple的每个字段。 比如下面这个bolt定义它所发射的tuple包含两个字段,类型分别是: double和triple。

 1     public class DoubleAndTripleBolt implements IRichBolt {
 2         private OutputCollectorBase _collector;
 3 
 4         @Override
 5         public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
 6             _collector = collector;
 7         }
 8 
 9         @Override
10         public void execute(Tuple input) {
11             intval = input.getInteger(0);
12             _collector.emit(input,newValues(val*2, val*3));
13             _collector.ack(input);
14         }
15 
16         @Override
17         public void cleanup() {
18         }
19 
20         @Override
21         public void declareOutputFields(OutputFieldsDeclarer declarer) {
22             declarer.declare(newFields("double","triple"));
23         }
24     }
declareOutputFields方法定义要输出的字段 : ["double", "triple"]。这个bolt的其它部分我们接下来会解释。

STORM和其他工具对比

目前比较流行的实时处理引擎有 Storm,Spark Streaming,Flink。每个引擎都有各自的特点和应用场景。 下表是对这三个引擎的简单对比。

 strom安装比较简单,这里省略安装步骤,安装成功如图所示:

STORM原理

运行中的Topology主要由以下三个组件组成的。

Task数量:表示每个Spout或Bolt逻辑上有多少个并发。它影响输出结果。

Worker数量:代表总共有几个JVM进程去执行我们的作业。

Executor数量:表示每个Spout或Bolt启动几个线程来运行

 下面代码中的数字表示Executor数量,它不影响结果,影响性能。

Worker的数量在Config中设置,下图代码中的部分表示Worker数量。

*本地模式中,Worker数不生效,只会启动一个JVM进行来执行作业。

*只有在集群模式设置Worker才有效。而且集群模式的时候一定要设置才能体现集群的价值。

 数据可靠性:

(1)Spout容错API:NextTuple中,emit时,指定MsgID。

(2)Bolt容错API:①emit时,锚定输入Tuple。②Act输入Tuple。

STORM参数设置

storm.zookeeper.servers:

ZooKeeper服务器列表

storm.zookeeper.port:

ZooKeeper连接端口

storm.local.dir:

storm使用的本地文件系统目录(必须存在并且storm进程可读写)

storm.cluster.mode:

Storm集群运行模式([distributed|local])

storm.local.mode.zmq:

Local模式下是否使用ZeroMQ作消息系统,如果设置为false则使用java消息系统。默认为false

storm.zookeeper.root:

ZooKeeper中Storm的根目录位置

storm.zookeeper.session.timeout:

客户端连接ZooKeeper超时时间

storm.id:

运行中拓扑的id,由storm name和一个唯一随机数组成。

nimbus.host:

nimbus服务器地址

nimbus.thrift.port:nimbus的thrift监听端口

nimbus.childopts:

通过storm-deploy项目部署时指定给nimbus进程的jvm选项

nimbus.task.timeout.secs:

心跳超时时间,超时后nimbus会认为task死掉并重分配给另一个地址

nimbus.monitor.freq.secs:

nimbus检查心跳和重分配任务的时间间隔。注意如果是机器宕掉nimbus会立即接管并处理

nimbus.supervisor.timeout.secs:

supervisor的心跳超时时间,一旦超过nimbus会认为该supervisor已死并停止为它分发新任务

nimbus.task.launch.secs:

task启动时的一个特殊超时设置。在启动后第一次心跳前会使用该值来临时替代nimbus.task.timeout.secs

nimbus.reassign:

当发现task失败时nimbus是否重新分配执行。默认为真,不建议修改

nimbus.file.copy.expiration.secs:

nimbus判断上传/下载链接的超时时间,当空闲时间超过该设定时nimbus会认为链接死掉并主动断开

ui.port:

Storm UI的服务端口

drpc.servers:

DRPC服务器列表,以便DRPCSpout知道和谁通讯

drpc.port:

Storm DRPC的服务端口

supervisor.slots.ports:

supervisor上能够运行workers的端口列表。每个worker占用一个端口,且每个端口只运行一个worker。

通过这项配置可以调整每台机器上运行的worker数。(调整slot数/每机)

supervisor.childopts:

在storm-deploy项目中使用,用来配置supervisor守护进程的jvm选项

supervisor.worker.timeout.secs:

supervisor中的worker心跳超时时间,一旦超时supervisor会尝试重启worker进程.

supervisor.worker.start.timeout.secs:

supervisor初始启动时,worker的心跳超时时间,当超过该时间supervisor会尝试重启worker。

因为JVM初始启动和配置会带来的额外消耗,从而使得第一次心跳会超过supervisor.worker.timeout.secs的设定

supervisor.enable:

supervisor是否应当运行分配给他的workers。默认为true,该选项用来进行Storm的单元测试,一般不应修改.

supervisor.heartbeat.frequency.secs:

supervisor心跳发送频率(多久发送一次)

supervisor.monitor.frequency.secs:

supervisor检查worker心跳的频率

worker.childopts:

supervisor启动worker时使用的jvm选项。所有的”%ID%”字串会被替换为对应worker的标识符

worker.heartbeat.frequency.secs:

worker的心跳发送时间间隔

task.heartbeat.frequency.secs:

task汇报状态心跳时间间隔

task.refresh.poll.secs:

task与其他tasks之间链接同步的频率。(如果task被重分配,其他tasks向它发送消息需要刷新连接)

。一般来讲,重分配发生时其他tasks会理解得到通知。该配置仅仅为了防止未通知的情况。

topology.debug:

如果设置成true,Storm将记录发射的每条信息。

topology.optimize:

master是否在合适时机通过在单个线程内运行多个task以达到优化topologies的目的

topology.workers:

执行该topology集群中应当启动的进程数量。

每个进程内部将以线程方式执行一定数目的tasks。topology的组件结合该参数和并行度提示来优化性能

topology.ackers:

topology中启动的acker任务数。

Acker保存由spout发送的tuples的记录,并探测tuple何时被完全处理。

当Acker探测到tuple被处理完毕时会向spout发送确认信息。通常应当根据topology的吞吐量来确定acker的数目,但一般不需要太多。

当设置为0时,相当于禁用了消息可靠性。storm会在spout发送tuples后立即进行确认

topology.message.timeout.secs:

topology中spout发送消息的最大处理超时时间。

如果一条消息在该时间窗口内未被成功ack,Storm会告知spout这条消息失败。而部分spout实现了失败消息重播功能。

topology.kryo.register:

注册到Kryo(Storm底层的序列化框架)的序列化方案列表。序列化方案可以是一个类名,或者是com.esotericsoftware.kryo.Serializer的实现

topology.skip.missing.kryo.registrations:

Storm是否应该跳过它不能识别的kryo序列化方案。如果设置为否task可能会装载失败或者在运行时抛出错误

topology.max.task.parallelism:

在一个topology中能够允许的最大组件并行度。该项配置主要用在本地模式中测试线程数限制.

topology.max.spout.pending:

一个spout task中处于pending状态的最大的tuples数量。该配置应用于单个task,而不是整个spouts或topology

topology.state.synchronization.timeout.secs:

组件同步状态源的最大超时时间(保留选项,暂未使用)

topology.stats.sample.rate:

用来产生task统计信息的tuples抽样百分比

topology.fall.back.on.java.serialization:

topology中是否使用java的序列化方案

zmq.threads:

每个worker进程内zeromq通讯用到的线程数

zmq.linger.millis:

当连接关闭时,链接尝试重新发送消息到目标主机的持续时长。这是一个不常用的高级选项,基本上可以忽略.

java.library.path:

JVM启动(如Nimbus,Supervisor和workers)时的java.library.path设置。该选项告诉JVM在哪些路径下定位本地库。

STORM一个简单例子

先看一个简单例子:

1 TopologyBuilder builder = new TopologyBuilder();
2 
3         builder.setSpout("spout", new RandomSentenceSpout(), 5);
4 
5         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
6         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

这个Topology包含一个Spout和两个Bolt。Spout发射单词。这三个节点被排成一条线: spout发射单词给第一个bolt, 第一个bolt然后把处理好的单词发射给第二个bolt。

我们使用setSpout和setBolt来定义Topology里面的节点。这些方法接收我们指定的一个id, 一个包含处理逻辑的对象(spout或者bolt), 以及你所需要的并行度。

这个包含处理的对象如果是spout那么要实现IRichSpout的接口, 如果是bolt,那么就要实现IRichBolt接口.

最后一个指定并行度的参数是可选的。它表示集群里面需要多少个thread来一起执行这个节点。如果你忽略它那么storm会分配一个线程来执行这个节点。

setBolt方法返回一个InputDeclarer对象, 这个对象是用来定义Bolt的输入。 这里第一个Bolt声明它要读取spout所发射的所有的tuple — 使用shuffle grouping。而第二个bolt声明它读取第一个bolt所发射的tuple。shuffle grouping表示所有的tuple会被随机的分发给bolt的所有task。

STORM测试代码

没有代码的技术文档总感觉少些说服性,废话不说,测试代码,使用了storm中example的代码中的kafka部分,提交命令成功截图是:

 

 从上图可知一共提交了4个拓扑,现在在UI界面 讲解一下每个页面的作用和说明:

1,首页运行界面可以看到Topology的概述,包括拓扑name,状态,:

Name: topology name

id: topology id (由storm生成)

status: topology的状态,包括(ACTIVE, INACTIVE, KILLED, REBALANCING)

uptime: topology运行的时间

num workers: 运行的workers数

num tasks: 运行的task数

2,进入其中一个拓扑展示界面是:

【topology stats】

window: 时间窗口,显示10m、3h、1d和all time的运行状况

emitted: emitted tuple数

transferred: transferred tuple数, 说下与emitted的区别:如果一个task,emitted一个tuple到2个task中,则transferred tuple数是emitted tuple数的两倍

complete latency: spout emitting 一个tuple到spout ack这个tuple的平均时间

acked: ack tuple数

failed: 失败的tuple数

【spouts】

id: spout id

parallelism: 任务数

last error: 最近的错误数,只显示最近的前200个错误

emitted、transferred、complete latency、acked和failed上面已解释

【bolts】

process latency: bolt收到一个tuple到bolt ack这个tuple的平均时间

其他参数都解释过了

还有componentpage和taskpage,参数的解释同上。

taskpage中的Component指的是spoutid或者boltid,time指的是错误发生的时间,error是指错误的具体内容。

STORM优化

kryo序列化

定制序列化

自定义的bolt之间emit数据是实体类的时候,注册kryo

Storm 使用 Kryo 来处理序列化。如果要实现自定义的序列化生成器,需要注册一个新的 Kryo 的序列化生成器。

可以通过拓扑的 topology.kryo.register 属性来添加自定义序列化生成器。两种姿势:

只有一个待注册的类的名称。在这种情况下,Storm 会使用 Kryo 的 FieldsSerializer 来序列化该类。conf.registerSerialization(UserEntity.class);

一个包含待注册的类的名称和对应的序列化实现类名称,该序列化实现类实现了 com.esotericsoftware.kryo.Serializer接口。

topology.kryo.register:
  - com.ly.CustomType1
  - com.ly.CustomType2: com.ly.serializer.CustomType2Serializer
  - com.ly.CustomType3

KafkaBolt批量提交

1 props.put("request.required.acks", "0");
2 props.put("producer.type", " async");
3 props.put("message.send.max.retries","3");
4 props.put("batch.num.messages", "200");
5 props.put("send.buffer.bytes", "1024*100");

使用组件的并行度代替线程池

Storm 自身是一个分布式、多线程的框架,对每个Spout 和Bolt,我们都可以设置其并发度;它也支持通过rebalance 命令来动态调整并发度,把负载分摊到多个Worker 上。如果自己在组件内部采用线程池做一些计算密集型的任务,比如JSON 解析,有可能使得某些组件的资源消耗特别高,其他组件又很低,导致Worker 之间资源消耗不均衡,这种情况在组件并行度比较低的时候更明显。
如某个Bolt 设置了1 个并行度,但在Bolt 中又启动了线程池,这样导致的一种后果就是,集群中分配了这个Bolt 的Worker 进程可能会把机器的资源都给消耗光了,影响到其他Topology 在这台机器上的任务的运行。如果真有计算密集型的任务,我们可以把组件的并发度设大,Worker 的数量也相应提高,让计算分配到多个节点上。

注意fieldsGrouping 的数据均衡性

fieldsGrouping 是根据一个或者多个Field 对数据进行分组,不同的目标Task 收到不同的数据,而同一个Task 收到的数据会相同。
假设某个Bolt 根据用户ID 对数据进行fieldsGrouping,如果某一些用户的数据特别多,而另外一些用户的数据又比较少,那么就可能使得下一级处理Bolt 收到的数据不均衡,整个处理的性能就会受制于某些数据量大的节点。可以加入更多的分组条件或者更换分组策略,使得数据具有均衡性。

优先使用localOrShuffleGrouping

localOrShuffleGrouping 是指如果目标Bolt 中的一个或者多个Task 和当前产生数据的Task 在同一个Worker 进程里面,那么就走内部的线程间通信,将Tuple 直接发给在当前Worker 进程的目的Task。否则,同shuffleGrouping。localOrShuffleGrouping 的数据传输性能优于shuffleGrouping,因为在Worker 内部传输,只需要通过Disruptor 队列就可以完成,没有网络开销和序列化开销。因此在数据处理的复杂度不高, 而网络开销和序列化开销占主要地位的情况下,可以优先使用localOrShuffleGrouping 来代替shuffleGrouping。

设置合理的Worker 数

Worker是运行在工作节点上面,被Supervisor守护进程创建的用来干活的JVM进程。每个Worker对应于一个给定topology的全部执行任务的一个子集。反过来说,一个Worker里面不会运行属于不同的topology的执行任务。它可以通过[storm rebalance]命令任意调整。

Worker 数越多,性能越好?先看一张Worker 数量和吞吐量对比的曲线

从图可以看出,在12 个Worker 的情况下,吞吐量最大,整体性能最优。这是由于一方面,每新增加一个Worker 进程,都会将一些原本线程间的内存通信变为进程间的网络通信,这些进程间的网络通信还需要进行序列化与反序列化操作,这些降低了吞吐率。
另一方面,每新增加一个Worker 进程,都会额外地增加多个线程(Netty 发送和接收线程、心跳线程、SystemBolt 线程以及其他系统组件对应的线程等),这些线程切换消耗了不少CPU,sys 系统CPU 消耗占比增加,在CPU 总使用率受限的情况下,降低了业务线程的使用效率。

设置合理的Executor数

可以理解成一个Worker进程中的工作线程。

一个Executor中只能运行隶属于同一个component(spout/bolt)的task。

一个Worker进程中可以有一个或多个Executor线程。在默认情况下,一个Executor运行一个task。

每个component(spout/bolt)的并发度就是指executor数量。

它可以通过[storm rebalance]命令任意调整。

设置合理的Task数

Task则是spout和bolt中具体要干的活了。一个Executor可以负责1个或多个task。

同时,task也是各个节点之间进行grouping(partition)的单位。无法在运行时调整。

--设置方法:

conf.setNumWorkers(workers);                                        //设置worker数量

uilder.setBolt("2", new WordSpliter(),4)                             //设置Executor并发数量

builder.setBolt("2", new WordSpliter(),4).setNumTasks(1); //设置每个线程处理的Task数量

--任务分配:

任务分配是有下面两种情况:

①、task数目比worker多:

例如task是[1 2 3 4],可用的slot(所谓slot就是可用的worker)只有[host1:port1,host2:port1],那么最终是这样分配
1:[host1:port1]

2:[host2:port1]

3:[host1:port1]

4:[host2:port1]

②、task数目比worker少:

例如task是[1 2],而worker有[host1:port1,host1:port2,host2:port1,host2:port2],

那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个机器上,也就是将worker排列成

[host1:port1,host2:port1,host1:port2,host2:port2]

然后分配任务为:

1:[host1:port1]

2:[host2:port1]

总结

感谢网络大神的分享:

https://blog.csdn.net/wangshuminjava/article/details/79367254

https://www.aboutyun.com//forum.php%5C?mod=viewthread&tid=7394&extra=page%3D1&page=1&

https://github.com/quchunhui/StormWordCount/blob/master/src/main/java/WordCountTopology.java

https://www.jianshu.com/p/1d17155c54c9

 

原文地址:https://www.cnblogs.com/boanxin/p/12180447.html