day18-19 Storm

课程介绍

课程名称:Storm是什么

课程目标:

通过该课程的学习能够了解离线计算与流式计算的区别、掌握Storm框架的基础知识、了解流式计算的一般架构图。

课程大纲:

1、 离线计算是什么?

2、 流式计算是什么?

3、 流式计算与离线计算的区别?

4、 Storm是什么?

5、 StormHadoop的区别?

6、 Storm的应用场景及行业案例

7、 Storm的核心组件(重点掌握)

8、 Storm的编程模型(重点掌握)

9、 流式计算的一般架构图(重点掌握)

背景介绍

Storm背景介绍

课程内容

1、离线计算是什么?

离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示

代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、***任务调度

1hivesql

2、调度平台

3Hadoop集群运维

4、数据清洗(脚本语言)

5、元数据管理

6、数据稽查

7、数据仓库模型架构

2、流式计算是什么

  流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示

  代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)

一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

3、离线计算与实时计算的区别

最大的区别:实时收集、实时计算、实时展示

4Storm是什么?

Flume实时采集,低延迟

Kafka消息队列,低延迟

Storm实时计算,低延迟

Redis实时存储,低延迟

Storm实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。

海量数据?数据类型很多,产生数据的终端很多,处理数据能力增强

5StormHadoop的区别

l Storm用于实时计算,Hadoop用于离线计算。

l Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。

l Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。

l StormHadoop的编程模型相似

 

Job:任务名称

JobTracker:项目经理

TaskTracker:开发组长、产品经理

Child:负责开发的人员

Mapper/Reduce:开发人员中的两种角色,一种是服务器开发、一种是客户端开发

Topology:任务名称

Nimbus:项目经理

Supervisor:开组长、产品经理

Worker:开人员

Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发

6Storm应用场景及行业案例

Storm用来实时计算源源不断产生的数据,如同流水线生产。

6.1、运用场景

日志分析

海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。

管道系统

将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop

消息转化器

将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件

6.2、典型案列

一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎

最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。

携程-网站性能监控:实时分析系统监控携程网的网站性能

利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。

阿里妈妈-用户画像:实时计算用户的兴趣数据

为了更加精准投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。

7Storm核心组件(重要)

 

l Nimbus:负责资源分配和任务调度。

l Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。---通过配置文件设置当前supervisor上启动多少个worker

l Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。

l Taskworker每一个spout/bolt的线程称为一个task. storm0.8之后,task不再与物理线程对应,不同spout/bolttask可能会共享一个物理线程,该线程称为executor

 

8Storm编程模型(重要)

 

1、编程模型
    DataSource:外部数据源
    Spout:接受外部数据源的组件,将外部数据源转化成Storm内部的数据,以Tuple为基本的传输单元下发给Bolt
    Bolt:接受Spout发送的数据,或上游的bolt的发送的数据。根据业务逻辑进行处理。发送给下一个Bolt或者是存储到某种介质上。介质可以是Redis可以是mysql,或者其他。
    Tuple:Storm内部中数据传输的基本单元,里面封装了一个List对象,用来保存数据。
    StreamGrouping:数据分组策略   (也就是为什么从这个bolt到下个bolt,为什么这么走,是可以设置的)
        7种:shuffleGrouping(Random函数),Non Grouping(Random函数),FieldGrouping(Hash取模)、Local or ShuffleGrouping 本地或随机,优先本地。
        
2、并发度
    用户指定的一个任务,可以被多个线程执行,并发度的数量等于线程的数量。一个任务的多个线程,会被运行在多个Worker(JVM)上,有一种类似于平均算法的负载均衡策略。
尽可能减少网络IO,和Hadoop中的MapReduce中的本地计算的道理一样。
3、架构 Nimbus:任务分配 Supervisor:接受任务,并启动worker。worker的数量根据端口号来的。 Worker:执行任务的具体组件(其实就是一个JVM),可以执行两种类型的任务,Spout任务或者bolt任务。 Task:Task=线程=executor。 一个Task属于一个Spout或者Bolt并发任务。 Zookeeper:保存任务分配的信息、心跳信息、元数据信息。 4
、Worker与topology 一个worker只属于一个topology,每个worker中运行的task只能属于这个topology。 反之,一个topology包含多个worker,其实就是这个topology运行在多个worker上。 一个topology要求的worker数量如果不被满足,集群在任务分配时,根据现有的worker先运行topology。如果当前集群中worker数量为0,那么最新提交的topology将只会被标识active,不会运行,只有当集群有了空闲资源之后,才会被运行。

9、流式计算一般架构图(重要)

 

PS:前三个是用来相当于数据源。

 其中flume用来获取数据。

l Kafka用来临时保存数据。

l Strom用来计算数据。

l Redis是个内存数据库,用来保存数据。

-----------------------------------Strom安装配置

http://blog.csdn.net/kwu_ganymede/article/details/52169861

 https://www.cnblogs.com/zhaojiankai/p/7257617.html

PS:在安装配置的过程中遇到ui启动的错误,看了这个博客,执行了命令算是启动了
PS:同时又安装配置了一次zookeeper,详情看zookeeper的我的博客,也可参看第二个视频
PS:配置时,确保已经启动的了zookeeper

 

1、 集群部署的基本流程

集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群

注意:

    所有的集群上都需要配置hosts

    vi  /etc/hosts

192.168.239.128 storm01 zk01 hadoop01

      192.168.239.129 storm02 zk02 hadoop02

  192.168.239.130 storm03 zk03 hadoop03

2、 集群部署的基础环境准备

安装前的准备工作(zk集群已经部署完毕)

关闭防火墙

chkconfig iptables off  && setenforce 0

l 创建用户

groupadd realtime && useradd realtime && usermod -a -G realtime realtime

l 创建工作目录并赋权

mkdir /export

mkdir /export/servers

chmod 755 -R /export

切换到realtime用户下

su realtime

3Storm集群部署

3.1、下载安装包

wget    http://124.202.164.6/files/1139000006794ECA/apache.fayea.com/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz

3.2、解压安装包

tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/

cd /export/servers/

ln -s apache-storm-0.9.5 storm

3.3、修改配置文件

mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak

vi /export/servers/storm/conf/storm.yaml

输入以下内容:

#指定storm使用的zk集群
storm.zookeeper.servers:
     - "bee1"
     - "bee2"
     - "bee3"
#指定storm本地状态保存地址
storm.local.dir: "/apps/storm/workdir"
#指定storm集群中的nimbus节点所在的服务器
nimbus.host: "bee1"
#指定nimbus启动JVM最大可用内存大小
nimbus.childopts: "-Xmx1024m"
#指定supervisor启动JVM最大可用内存大小
supervisor.childopts: "-Xmx1024m"
#指定supervisor节点上,每个worker启动JVM最大可用内存大小
worker.childopts: "-Xmx768m"
#指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。
ui.childopts: "-Xmx768m"
#指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

3.4、分发安装包

scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers

然后分别在各机器上创建软连接

cd /export/servers/

ln -s apache-storm-0.9.5 storm

3.5、启动集群

nimbus.host所属的机器上启动 nimbus服务

cd /export/servers/storm/bin/

nohup ./storm nimbus &

nimbus.host所属的机器上启动ui服务

cd /export/servers/storm/bin/

nohup ./storm ui &

在其它个点击上启动supervisor服务

cd /export/servers/storm/bin/

nohup ./storm supervisor &

3.6、查看集群

访问nimbus.host:/8080,即可看到stormui界面。

PS:对从机器执行storm supervisor&,我们看一下可执行机器

PS:每个集群有4个,所以有8个

PS:对从机器介绍

测试wordCount例子

PS:storm jar /root/apps/storm/examples/storm-starter/storm-starter-topologies-0.9.5.jar storm.starter.WordCountTopology wordcount

PS:有3个workerS,就会有3个jvm,会有3个端口号,一个对口号就是一个槽

PS:同时提交4个以后,不够的只能等待了;当有空闲的worker就会执行空闲的

4、Worker与topology
    一个worker只属于一个topology,每个worker中运行的task只能属于这个topology。    反之,一个topology包含多个worker,其实就是这个topology运行在多个worker上。
    一个topology要求的worker数量如果不被满足,集群在任务分配时,根据现有的worker先运行topology。如果当前集群中worker

PS:提交命令以后,worker自动启动,杀死程序以后,worker就没有了

 ------------------------------------------

PS:从上面可以看到,我们现在的研究主要是两个

1.并发度如何配置

2.worker的数量如何配置

PS:我们可以从源码中看到,在代码中设置的并发读和任务的执行度,下面分别是java、hadoop、storm的执行方式

 

PS:当在执行maven程序的时候,要注意scope,因为上传的时候,集群中都有了,所以不用再执行了,所有要注释

 

 ------------------------------

package cn.itcast.storm;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;


/**
 * Created by bee on 2018/3/19.
 */
public class WordCountTopologMain {

     public static void main(String [] args) throws AlreadyAliveException, InvalidTopologyException {
         //1、准备一个TopologyBuilder
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         topologyBuilder.setSpout("mySpout",new MySpout(),1);//设置并发度
         topologyBuilder.setBolt("mybolt1",new MySplitBolt(),10).shuffleGrouping("mySpout");
         topologyBuilder.setBolt("mybolt2",new MyCountBolt(),2).fieldsGrouping("mybolt1",new Fields("word"));
         //2、创建一个configuration,用来指定当前topology 需要的worker的数量
         Config config =  new Config();
         config.setNumWorkers(2);

         //3、提交任务  -----两种模式 本地模式和集群模式
        // StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
         LocalCluster localCluster = new LocalCluster();
         localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
     }
}
package cn.itcast.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by bee on 2018/3/19.
 */
public class MySpout extends BaseRichSpout {
    SpoutOutputCollector collector;
    /**
     * 初始化方法,给nextTuple使用
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {
        this.collector=collector;
    }
    //storm 框架在 while(true) 调用nextTuple方法
    public void nextTuple() {
        collector.emit(new Values("bi yang qiang love zhao huan huan"));//这个values继承自ArrayList
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("Love"));//我自己的声明,想叫什么叫什么
    }

}
package cn.itcast.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

/**
 * Created by bee on 2018/3/19.
 */
public class MySplitBolt  extends BaseRichBolt {

    OutputCollector collector;
    //初始化方法
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    // 被storm框架 while(true) 循环调用  传入参数tuple;这个参数来自于spout所发射的
    public void execute(Tuple input) {
        String line = input.getString(0);//这里是0的原因是,因为发的是List的一个值,所有取list的第一个元素,这个是自己知道
        String[] arrWords = line.split(" ");
        for (String word:arrWords){
            collector.emit(new Values(word,1));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","num"));//声明的原因是上面发射出来两个字段
    }
}
package cn.itcast.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by bee on 2018/3/19.
 */
public class MyCountBolt extends BaseRichBolt {
    OutputCollector collector;
    Map<String, Integer> map = new HashMap<String, Integer>();

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple input) {
        String word = input.getString(0);//因为上面发了两处数据
        Integer num = input.getInteger(1);
        System.out.println(Thread.currentThread().getId() + "    word:"+word);
        if (map.containsKey(word)){
            Integer count = map.get(word);
            map.put(word,count + num);
        }else {
            map.put(word,num);
        }
        System.out.println("count:"+map);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //不輸出
    } 
}

PS:nimbus负责任务的分配,在程序中我设置了 wordcount的执行线程数,spout数量。那么就有8个task;同时设置两个worker;worker就会对上诉的task就行细分

然后在不同bolt传输的规则还不太一样,见最下面。

架构模型图 

  

 ------------------------

PS:实时是今后的趋势。

2Storm通信机制

Worker间的通信经常需要通过网络跨节点进行,Storm使用ZeroMQNetty(0.9以后默认使用)作为进程间通信的消息框架。

Worker进程内部通信:不同workerthread通信使用LMAX Disruptor来完成。

  不同topologey之间的通信,Storm不负责,需要自己想办法实现,例如使用kafka等;

2.1Worker进程间通信

worker进程间消息传递机制,消息的接收和处理的大概流程见下图

PS:详细看课件,本节听的不是太明白

 

 

 

原文地址:https://www.cnblogs.com/bee-home/p/8591302.html