strom基础概念

一、Storm topology提交到集群分析 

   storm目前1.x版本支持nimbus的高可用(其实也可以不需要高可用,因为nimbus是无状态的,只要运行的topology没有故障且没有新的任务需要提交到storm集群,那么也可以不要nimbus,因为是worker在运行任务,nimbus只是负责任务分配,资源调度且和supervisor保持心跳我们可以做好整个集群的监控即可,当nimbus挂了后,直接重启它,不会影响正在运行的topology)。
    当我们将topology提交到storm集群的时候,如果你搭的环境是一套高可用的环境,首先需要找到leader nimbus节点,因为需要向leader节点提交我们的拓扑,当调用submitTopology的时候,首先会进行相关的配置校验,然后找到配置的nimbus的ip,循环找到为leader的ip地址将其构建成一个NimbusClient返回,然后开始提交jar,我们可以把这个过程抽象成,我们是thift client通过RPC向server提交jar,jar上传完成后,通知Nimbus上传任务已经完成,nimbus接收到jar后将jar重命名后保存到inbox目录下,然后进行配置检查,任务分配。
   nimbus首先会检查它的配置信息,以及整个storm集群中可用的slots(可用理解为worker),
   第一步:检查整个集群中可用的worker,然后根据配置的worker数分配
   第二步:在zk中创建任务的心跳检测节点/storm/workbeats/,storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。任务将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
   第三步:开始分配任务,根据topology配置的task数和worker数,进行分配
     3.1 如果task数目比worker数多
          比如task 为4个,worker为2个
          task1,task2,taks3,task4
          worker1,woker2, 那么效果就是task1到worker1,task2到woker2,这样依次轮询
     3.2 如果worker数比task数目多
         比如task5个,worker 8 个 基本会轮询保证task不会全部分配到同一个worker上
   第四步:supervisor下载分配给自己的topology下载完成后然后运行。


二、高可用测试


       storm nimbus的单点问题解决了在1.版本,主要理由zk的Ledaer选举实现,其实主要是依靠分布式互斥锁来实现,我们可以将zk的一个数据节点代表一个锁,当多个客户端同时调用create()节点创建节点的时候,zookeeper会保证只会有一个客户端创建成功,那么我们就可以让这个创建成功的客户端让其持有锁,而其它的客户端则注册Watcher监听,当持有锁的客户端释放锁后,监听的客户端就会收到Watcher通知,然后再去试图获取锁。
   1、直接停掉nimbus leader :已经运行的任务不会有任何问题  
  2、任务启动过程中,立刻停掉nimbus leader 
     如果此时新Leader还没选举出来,任务提交会失败
     如果新的Leader已经产生了,任务提交成功
   3、关闭worker所在机器的supervisor守护进程(默认3秒检测一次worker进程)--> Kill -9 $workerPid ---> 关闭Nimbus Leader --->worker没有重新启动/飘走-->1分钟后启动supervisor --->worker飘移走
   4、storm的高可用利用分布式缓存API来实现数据的备份,实现文件在多个topology之间的共享。


三、storm的主要特点


       1、水平扩展:可以直接通过加机器来提高整个集群处理任务的速率,不仅可以直接添加机器作为supervisor节点,而且可以动态调整并发度来改变运行某个topology的线程数
    2、容错性比较好:当Nimbus挂了,可以通过高可用来选举出一个新的Leader(当然nimbus挂了也不是什么大问题,因为运行任务不在nimbus节点上),当supervisor挂了,nimbus会通过心跳检测到,然后会将运行在这个supervisor节点上的所有任务转移到其它节点上,当worker挂了,即运行任务的工作进程挂了,首先supervisor会尝试重启它,如果没启动成功,那么nimbus就会将运行在worker上的任务转移到其它可用的worker节点上运行。
    3、消息的可靠性处理:storm默认提供3种类型,
      仅仅一次:原生storm api无法实现,需要用的storm 高级部分trident实现 
      最多一次:也就是消息最多只发一次,不管这条消息有没有被成功处理
      至少一次:消息如果处理失败后,会重发
    4、节点的无状态:状态都保存在zk里面,当我们使用kill -9 $nimbus_pid的时候,不影响整个集群的运行。

四、运行Storm的相关术语

      1、topology:拓扑,运行一组spout/bolt构成的计算逻辑组件的总称
   2、spout:storm消息来源,一般的话是消息队列
   3、bolt:处理消息逻辑的组件,它的上游可以是spout也可以是Bolt
   4、tuple:运行的消息单元,比如句子 zhangsan is a man 这个消息就是一个tuple
   5、Stream:流,由一条条消息组织的,抽象成流
   6、Stream Grouping:流分组,定义各个计算组件之间流的连接,分组,分发策略等


五、看一个简单的例子(测试storm的容错性)


      从消息队列Kafka接收消息,然后进行单词拆分,然后统计单词出现的次数.起一个线程从消息队列收消息,3个线程进行单词拆分,1个线程进行全局计数
    1、如果nimbus挂了怎么办?
    2、如果supervisor 挂了怎么办?
    3、如果worker挂了怎么办?
    4、节点挂了怎么办?
    第一个问题:nimbus是无状态的,nimbus挂了不影响整个集群中已经在运行的topology的运行。
   第二个问题:supervisor守护进程挂了,没有影响。
   第三个问题:worker挂了怎么办? supervisor会尝试重启,如果没有启动成功,则发生worker转移,转移到其它可用的节点上运行。
   第四个问题:Node节点挂了怎么办? 这个实际上也可用理解为worker挂了,也会发生worker转移。


六、storm的并发度


      一个topology可用跑在多个worker进程上,自然可用想的在一个进程里面,我们可以跑多个线程,再一次加快数据的处理效率,默认情况下,如果我们不设置线程数,就是1,且默认情况下,一个线程对应一个task任务(task可以理解为spout/bolt实例)
   其中,每一个worker默认起一个acker,也就是说每一个worker默认起一个线程来执行acker,而acker实际上就是一个特殊的bolt,所以在计算整个集群线程数的时候,需要加上acker


七、流分组


       流分组定义了上层tuple如何路由到下一层tuple,storm里面默认有以下几种。注意,首先下层必须有多个线程,不然一个线程定义流分组没有意义。
   1、shuffleGrouping : 轮询
   2、filedGrouping:按照上一层的输出属性,相同属性的会被路由到同一个线程处理
   3、allGrouping:广播分组,对于每一个tuple,所有的Bolt都会收到
   4、globalGrouping:全局分组,只会到bolt里面其中task-id最小的那一个
   5、noGrouping:不分组,随机
   6、DirectGrouping:直接分组上一层可以直接指定想把该消息路由到下一层的哪个bolt
   7、自定义分组
   8、localorshuffleGrouping:如果上层bolt和下层bolt运行在同一个进程里面,优先在进程内通信,使用Disruptor有界队列


八、storm的可靠性测试

  8.1: 消息的ack

      当一个spout数据源发射了消息后,一个tuple经过了所有的Bolt处理之后,这个tuple才被认为是处理完了,而acker就是一个用来追踪这些消息的组件,
acker内部类似于一个map,维护了消息id和顶层spoutid直接的关系 
    1、如果task挂了,一个tuple没有被ack,那么会在超时之后(30s)spout负责重发
    2、如果acker挂了,那么由这个tuple追踪的所有的tuple都会超时,也会被重发
    3、那么spout挂了,第三方消息源负责重发

   8.2 如何实现可靠的spout

     storm的bolt和spout有很多接口和抽象类,我们可以实现ISpout接口,首先需要注意的是spout里面,nextTuple,ack,fail是运行在同一个线程里面,所有不要在nextTuple里面执行一些比较耗时的处理,默认30s超时,如果acker-30s还没有收到那么就会超时重发,且nextTuple不能阻塞,如果没有消息发射,他会sleep一下,释放cpu,而如果我们想实现可靠的spout,需要自己维护msgId和消息直接的对应关系,可以放在map,redis里面,并且自己实现fail函数进行消息重发。

   8.3 如何实现可靠的bolt

    storm提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt都可以实现消息的可靠性处理,其中BaseBasicBolt,storm已经帮我们内部自己实现了消息的可靠处理,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack 。
   而BaseRichBolt则需要我们自己维护msgid和tuple之间的关系并手动实现ack或者fail
   下面来分析一下BaseBasicBolt是怎么实现了,当我们调用emit的时候,由于没有streamId和tuple之间的对应关系,storm会给我们自己生成一个默认的流id,最后会起一个BasicBoltExecutor,这里面的excetue方法自己给我们实现了ack和失败后的fail

   8.4 针对storm提交的3种消息保证语义,在ack的基础上来看一下如何实现

    8.4.1、如何实现最多一次(下面几个条件只要有一个满足,都只能实现最多一次语义)
       1)将ack的个数设置为0
       2)Spout不实现可靠的消息处理
          不带msgId或者不实现fail函数
       3)bolt不把处理的消息发送给acker
    8.4.2 如何实现至少一次
       1)开启ack机制,即ack数目大于0(默认每个worker一个ack)
       2)Spout 实现可靠性传输保证
         Spout 发送消息时附带 message 的 ID
         如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
       3)Bolt 在处理成功或失败后需要调用相应的方法通知 Acker  
    8.4.3 实现仅仅一次
    storm原生api不能做到,要实现仅仅一次,需要存储tuple的状态,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现


九、思考?
   1、storm消息重发,重发的消息从哪来?重复的消息怎么处理?
      storm需要维护msgId和具体tuple之间的关系  
   2、如果一个bolt总是处理失败导致spout重发,维护的数据越来越多,最后OOM,怎么办?  
       storm在以前的版本里面可以设置max.pending数(前提条件是必须开启ack),即如果spout组件发现还有这么多消息没有给他ack,当达到这个阀值的时候,就不会在接着发tuple给bolt
     PS:这种设置方法个人感觉值得商榷,首先这个值应该设置多少,设置小了,那么整个集群的吞吐量上不去,设置大了很有可能导致内存溢出。


十、事物


     storm事物在0.7版本后封装到trident中,虽然原生的事物api已经废弃,但是对于我们理解高级部分trident还是非常有帮助的。事物在storm里面主要分为2个节点,事物的处理阶段和提交阶段,事物的处理阶段是可以并行处理的,但是是提交阶段必须按照事物id依次排队处理。
   storm里面事物主要分为3种,一种是普通的事物,一种是分区事物,一种是不透明事物(事物的更高级封装)
  1、普通事物ITransactionSpout接口,事物处理阶段起多个线程,提交阶段起1个线程
     Coordinator:初始化事物
     Emitter:emitBatch接收初始化事物生成的事物元数据,发送出去
  2、分区事物IPartitionTransactionSpout接口,主流的事物Spout
      Coordinator:isReady和返回分区个数
      Emitter: emitPartitionBatchNew 发送新分区的元数据
               emitPartitionBatch 事物处理失败后重发
  3、不透明事物IOpaqueTransactionSpout,容错性最好的事物Spout
      它不区分每一批是不是发送的tuple是一样的,因为即使某个分区不可用,它还会继续发送可用的分区,等其他不可用的分区可用后,这些分区里面的数据被放到下一个事物batch里面执

正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
原文地址:https://www.cnblogs.com/candlia/p/11920214.html