storm架构原理及集群部署

Storm 流式计算

1. 概念

1.1 离线计算和实时计算

离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示

​ 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、zookeeper任务调度

1、hivesql

2、调度平台

3、Hadoop集群运维

4、数据清洗(脚本语言)

5、元数据管理

6、数据稽查

7、数据仓库模型架构

流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示

​ 代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)。

​ 一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

区别

​ 最大的区别:实时收集、实时计算、实时展示

1.2 Storm是什么?

​		 ![img](img/storm-flow.png)

​ Flume实时采集, 低延迟

​ Kafka消息队列, 低延迟

​ Storm实时计算, 低延迟

​ Redis实时存储, 低延迟

​ Storm用来实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。

1.3 Storm与Hadoop的区别

  • Storm用于实时计算,Hadoop用于离线计算。
  • Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。
  • Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。
  • Storm与Hadoop的编程模型相似

1561338076765

Job:任务名称

JobTracker:项目经理

TaskTracker:开发组长、产品经理

Child:负责开发的人员

Mapper/Reduce:开发人员中的两种角色,一种是服务器开发、一种是客户端开发

Topology:任务名称

Nimbus:项目经理

Supervisor:开组长、产品经理

Worker:开人员

Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发


Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。

MapReduce:为TB、PB级别数据设计的批处理计算框架。

1561341354801

1.4 Storm应用场景及行业案例

Storm用来实时计算源源不断产生的数据,如同流水线生产。

  1. 运用场景

日志分析

​ 从海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。

管道系统

​ 将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop

消息转化器

​ 将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件

  1. 行业案例

一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎

最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。

携程-网站性能监控:实时分析系统监控携程网的网站性能

利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。

阿里妈妈-用户画像:实时计算用户的兴趣数据

为了更加精准投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。

2. Storm核心组件(重要)

在这里插入图片描述

Nimbus:是整个集群的控管核心,负责topology的提交、运行状态监控、任务重新分配等工作。

zookeeper就是一个管理者,监控者,Storm的所有的状态信息都是保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务,supervisor,task通过从zookeeper中读状态来领取任务,同时supervisor, task也会定义发送心跳信息到zookeeper,使得nimbus可以监控整个storm集群的状态,从而可以重启一些挂掉的task。ZooKeeper使得整个storm集群十分的健壮,任何一台工作机器挂掉都没有关系,只要重启然后从zookeeper上面重新获取状态信息就可以了。

Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。通过配置文件设置当前supervisor上启动多少个worker,默认4个。

Worker:运行具体处理组件逻辑的进程(在Supervisor)。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。

Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。

总体描述:nimbus下命令(分配任务),zk监督执行(心跳监控,worker、supurvisor的心跳都归它管),supervisor领旨(下载代码),招募人马(创建worker和线程等),worker、executor就给我干活!task就是具体要干的活。

Storm集群中有两类节点:主控节点(Master Node)和工作节点(Worker Node)。其中,主控节点只有一个,而工作节点可以有多个。

主控节点运行一个称为Nimbus的守护进程类似于Hadoop的JobTracker。Nimbus负责在集群中分发代码,对节点分配任务,并监视主机故障。

每个工作节点运行一个称为Supervisor的守护进程。Supervisor监听其主机上已经分配的主机的作业,启动和停止Nimbus已经分配的工作进程。

流分组,是拓扑定义中的一部分,为每个Bolt指定应该接收哪个流作为输入。流分组定义了流/元组如何在Bolt的任务之间进行分发。Storm内置了8种流分组方式。

Worker是Spout/Bolt中运行具体处理逻辑的进程。一个worker就是一个进程,进程里面包含一个或多个线程。

一个线程就是一个executor,一个线程会处理一个或多个任务。

一个任务就是一个task。

3. Storm编程模型(重要)

在这里插入图片描述

Topology:Storm中运行的一个实时应用程序的名称,因为各个组件间的消息流动而形成逻辑上的拓扑结构。(拓扑-DAG有向无环图的实现)

把实时应用程序的运行逻辑打成jar包后提交到Storm的拓扑(Topology)。Storm的拓扑类似于MapReduce的作业(Job)。其主要的区别是,MapReduce的作业最终会完成,而一个拓扑永远都在运行直到它被杀死。一个拓扑是一个图的Spout和Bolt的连接流分组。

Spout:在一个topology中获取源数据流的组件,Spout是拓扑的流的来源,是一个拓扑中产生源数据流的组件。通常情况下,Spout会从外部数据源中读取数据,然后转换为拓扑内部的源数据。

​ Spout可以是可靠的,也可以是不可靠的。如果Storm处理元组失败,可靠的Spout能够重新发射,而不可靠的Spout就尽快忘记发出的元组。

​ Spout可以发出超过一个流。

Spout的主要方法是nextTuple()。NextTuple()会发出一个新的Tuple到拓扑,如果没有新的元组发出,则简单返回。

​ Spout的其他方法是ack()和fail()。当Storm检测到一个元组从Spout发出时,ack()和fail()会被调用,要么成功完成通过拓扑,要么未能完成。Ack()和fail()仅被可靠的Spout调用。

​ IRichSpout是Spout必须实现的接口。

通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。

Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。

在拓扑中所有处理都在Bolt中完成,Bolt是流的处理节点,从一个拓扑接收数据,然后执行进行处理的组件。Bolt可以完成过滤、业务处理、连接运算、连接与访问数据库等任何操作。

Bolt是一个被动的角色,接口中有一个execute()方法,在接收到消息后会调用此方法,用户可以在其中执行自己希望的操作。

Bolt可以完成简单的流的转换,而完成复杂的流的转换通常需要多个步骤,因此需要多个Bolt。

Bolt可以发出超过一个的流。

Tuple:是消息传递的基本单元,是一个命名的值列表,元组中的字段可以是任何类型的对象。Storm使用元组作为其数据模型,元组支持所有的基本类型、字符串和字节数组作为字段值,只要实现类型的序列化接口就可以使用该类型的对象。元组本来应该是一个key-value的Map,但是由于各个组件间传递的元组的字段名称已经事先定义好,所以只要按序把元组填入各个value即可,所以元组是一个value的List。

Stream:是一个无界的元组系列。源源不断传递的元组就组成了流,在分布式环境中并行地进行创建和处理。

1561341449896

4. 流式计算一般架构图(重要)

在这里插入图片描述

5. Storm vs Spark Streaming

5.1 Storm适用场景

  1. 需要纯实时,不能忍受1秒以上延迟的场景下使用,比如金融系统
  2. 对于延迟需求很高的纯粹的流处理工作负载
  3. 需求主要集中在流处理与CEP(即复杂事件处理)式处理层面
  4. 若还需要针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况),也可以考虑用Storm
  5. 如果一个大数据应用系统,它就是纯粹的实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等,那么用Storm是比较好的选择

5.2 Spark Streaming适用场景

  1. 如果对上述适用于Storm的几点,一条都不满足的实时场景,即:不要求纯实时,不要求动态调整并行度等,那么可以考虑使用Spark Streaming
  2. 考虑使用Spark Streaming最主要的一个因素,应该是针对整个项目进行宏观的考虑,即:如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么就应该首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性 Spark Streaming与Storm的优劣分析事实上,Spark Streaming绝对谈不上比Storm优秀。
  3. 必须利用交互式shell通过API调用实现数据探索

5.3 技术特点对比

对比项   storm spark
处理方式   流式数据处理、移动数据(数据流入计算节点) 批处理数据、移动计算(针对数据形成任务进行计算)
延迟性   >=100ms 2s左右
吞吐量 Low High
容错性   ack组件进行数据流的跟踪,开销大 通过lineage以及在内存维护两份数据备份进行容错
事务性   通过跟踪机制能保证每个记录至少被处理一次,如果需要保证状态只更新一次的话,需要由用户自己来实现。 保证数据只被处理一次,并且是在批处理的层次级别。对于statefull的计算,对事务性比较高的话,Spark streaming要更好一些。
动态调整并行度   支持 不支持
数据处理保证   at least once(实现采用record-level acknowledgments),Trident可以支持storm 提供exactly once语义。 exactly once(实现采用Chandy-Lamport 算法,即marker-checkpoint )

如果对延迟要求不高的情况下,建议使用Spark Streaming,丰富的高级API,使用简单,天然对接Spark生态栈中的其他组件,吞吐量大,部署简单,UI界面也做的更加智能,社区活跃度较高,有问题响应速度也是比较快的,比较适合做流式的ETL,而且Spark的发展势头也是有目共睹的,相信未来性能和功能将会更加完善。

6. 分组策略和并发度

6.1 分组策略(Stream Grouping)

stream grouping用来定义一个stream应该如何分配给Bolts上面的多个Executors(多线程、多并发)。Storm里面有7种类型的stream grouping

1)Shuffle Grouping: 随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

2)Fields Grouping**:按字段分组**,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

3)All Grouping**:广播发送**,对于每一个tuple,所有的bolts都会收到。

4)Global Grouping**:全局分组**,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5)Non Grouping**:不分组**,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。在多线程情况下不平均分配。

6)Direct Grouping**:直接分组**,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

7)Local or Shuffle Grouping**:**如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。

8**)customer Grouping**:自定义,相当于MapReduce自己自己去实现一个partition。

6.2 并发度

6.2.1 场景分析

1)单线程下:加减乘除、全局汇总

2)多线程下:局部加减乘除、持久化DB等

(1)思考:如何计算:word总数和word个数?并且在高并发下完成

前者是统计总行数,后者是去重word个数;

类似企业场景:计算网站PV和UV

(2)网站最常用的两个指标:

PV(page views):count (session_id) 即页面浏览量。

UV(user views):count(distinct session_id) 即独立访客数。

a)用ip地址分析

指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。

b)用Cookie分析UV值

当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个CookieCCookieCookie

实时处理的业务场景主要包括:汇总型(如网站PV、销售额、订单数)、去重型(网站UV、顾客数、销售商品数)

6.2.2 并发度

并发度:用户指定一个任务,可以被多个线程执行,并发度的数量等于线程executor的数量。

Task就是具体的处理逻辑对象,一个executor线程可以执行一个或多个tasks,但一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。

Task代表最大并发度,一个component的task数是不会改变的,但是一个componet的executer数目是会发生变化的(storm rebalance命令),task数>=executor数,executor数代表实际并发数。

在这里插入图片描述

对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级为:

defaults.yaml < storm.yaml < topology-specific configuration

<internal component-specific configuration < external component-specific configuration

worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大亍machines的数目

executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt(“green-bolt”, new GreenBolt(), 2)

tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置

Topology的worker数通过config设置,即执行该topology的worker(java)进程数。它可以通过 storm rebalance 命令任意调整。

		TopologyBuilder topologyBuilder = new TopologyBuilder();
		Config conf = new Config();
		conf.setNumWorkers(2); // 用2个worker
		topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置2个并发度
		topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); // 设置2个并发度,4个任务
		topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt"); // 设置6个并发度
		LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("mytopology", conf, topologyBuilder.createTopology());

在这里插入图片描述

3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每个worker产生10 / 2 = 5条线程。

绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。

动态的改变并行度

Storm支持在不 restart topology 的情况下, 动态的改变(增减) worker processes 的数目和 executors 的数目, 称为rebalancing. 通过Storm web UI,或者通过storm rebalance命令实现:

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

集群部署

1、 集群部署的基本流程
集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群
注意:
启动zookeeper集群
所有的集群上都需要配置hosts
vi /etc/hosts
192.168.239.128 storm01 zk01 hadoop01
192.168.239.129 storm02 zk02 hadoop02
192.168.239.130 storm03 zk03 hadoop03

2、 集群部署的基础环境准备
安装前的准备工作(zk集群已经部署完毕)
 关闭防火墙
chkconfig iptables off && setenforce 0
 创建用户
groupadd realtime && useradd realtime && usermod -a -G realtime realtime
 创建工作目录并赋权
mkdir /export
mkdir /export/servers
chmod 755 -R /export
 切换到realtime用户下
su realtime
3、Storm集群部署
3.1、上传安装包
3.2、解压安装包
tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-storm-0.9.5 storm
3.3、修改配置文件
mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak
vi /export/servers/storm/conf/storm.yaml
输入以下内容:
在这里插入图片描述
3.4、分发安装包
scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers
然后分别在各机器上创建软连 接
cd /export/servers/
ln -s apache-storm-0.9.5 storm
3.5、启动集群
 在nimbus.host所属的机器上启动 nimbus服务
cd /export/servers/storm/bin/
nohup ./storm nimbus &
 在nimbus.host所属的机器上启动ui服务
cd /export/servers/storm/bin/
nohup ./storm ui &
 在其它个点击上启动supervisor服务
cd /export/servers/storm/bin/
nohup ./storm supervisor &
3.6、查看集群
访问nimbus.host:/8080,即可看到storm的ui界面。

在这里插入图片描述

原文地址:https://www.cnblogs.com/ernst/p/12819173.html