Storm—Storm分组策略和并发度

五 分组策略和并发度

5.1 读取文件案例思考

1)spout数据源:数据库、文件、MQ(比如:Kafka)

2)数据源是数据库:只适合读取数据库的配置文件

3)数据源是文件:只适合测试、讲课用(因为集群是分布式集群)

4)企业产生的log文件处理步骤:

       (1)读出内容写入MQ

       (2)Storm再处理

5.2 分组策略(Stream Grouping)

stream grouping用来定义一个stream应该如何分配给Bolts上面的多个Executors(多线程、多并发)。

Storm里面有7种类型的stream grouping

1)Shuffle Grouping: 随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

2)Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

3)All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

4)Global Grouping:全局分组,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5)Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。在多线程情况下不平均分配。

6)Direct Grouping:直接分组,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

7)Local or shuffle grouping如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。

8)测试

         (1)spout并发度修改为2,bolt并发度修改为1,shuffleGrouping模式

builder.setSpout("WebLogSpout", new WebLogSpout(),2);

builder.setBolt("WebLogBolt", new WebLogBolt(), 1).shuffleGrouping("WebLogSpout");

Spout开两个线程会对数据读取两份,打印出来就是2份。如果数据源是消息队列,就不会出来读取两份的数据(统一消费者组,只能有一个消费者)

Thread-33-WebLogBolt-executor[1 1]lines:60  session_id:CYYH6Y2345GHI899OFG4V9U567

         (2)spout并发度修改为1,bolt并发度修改为2,noneGrouping模式

builder.setSpout("WebLogSpout", new WebLogSpout(),1);

builder.setBolt("WebLogBolt", new WebLogBolt(), 2).noneGrouping("WebLogSpout");

每个bolt接收到的单词不同。

Thread-33-WebLogBolt-executor[1 1]lines:14  session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

Thread-34-WebLogBolt-executor[2 2]lines:16  session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

         (3)spout并发度修改为1,bolt并发度修改为2,fieldsGrouping

builder.setSpout("WebLogSpout", new WebLogSpout(),1);

builder.setBolt("WebLogBolt", new WebLogBolt(), 2).fieldsGrouping("WebLogSpout", new Fields("log"));

基于web案例不明显,后续案例比较明显

         (4)spout并发度修改为1,bolt并发度修改为2,allGrouping(“spout”);

builder.setSpout("WebLogSpout", new WebLogSpout(),1);

builder.setBolt("WebLogBolt", new WebLogBolt(), 2).allGrouping("WebLogSpout");

每一个bolt获取到的数据都是一样的。

Thread-43-WebLogBolt-executor[1 1]lines:30  session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

Thread-23-WebLogBolt-executor[2 2]lines:30  session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

         (5)spout并发度修改为1,bolt并发度修改为2,globalGrouping(“spout”);

builder.setSpout("WebLogSpout", new WebLogSpout(),1);

builder.setBolt("WebLogBolt", new WebLogBolt(), 2).globalGrouping("WebLogSpout");

Task的id最低的bolt获取到了所有数据。

Thread-28-WebLogBolt-executor[1 1]lines:30  session_id:VVVYH6Y4V4SFXZ56JIPDPB4V678

5.3 并发度

5.3.1 场景分析

1)单线程下:加减乘除、全局汇总

2)多线程下:局部加减乘除、持久化DB等

(1)思考:如何计算:word总数和word个数?并且在高并发下完成

前者是统计总行数,后者是去重word个数;

类似企业场景:计算网站PV和UV

(2)网站最常用的两个指标:

PV(page views):count (session_id) 即页面浏览量。

UV(user views):count(distinct session_id) 即独立访客数。

a)用ip地址分析

指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。

b)用Cookie分析UV值

  当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个Cookie,通常放在这个客户端电脑的C盘当中。在这个Cookie中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的Cookie文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。

实时处理的业务场景主要包括:汇总型(如网站PV、销售额、订单数)、去重型(网站UV、顾客数、销售商品数)

5.3.2 并发度

并发度:用户指定一个任务,可以被多个线程执行,并发度的数量等于线程executor的数量。

Task就是具体的处理逻辑对象,一个executor线程可以执行一个或多个tasks,但一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。

Task代表最大并发度,一个component的task数是不会改变的,但是一个componet的executer数目是会发生变化的(storm rebalance命令),task数>=executor数,executor数代表实际并发数。

5.4 实操案例

5.4.1 实时单词统计案例

1)需求

       实时统计发射到Storm框架中单词的总数。

2)分析

设计一个topology,来实现对文档里面的单词出现的频率进行统计。

整个topology分为三个部分:

(1)WordCountSpout:数据源,在已知的英文句子中,随机发送一条句子出去。

(2)WordCountSplitBolt:负责将单行文本记录(句子)切分成单词

(3)WordCountBolt:负责对单词的频率进行累加

(1)创建spout

package com.atguigu.storm.wordcount;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichSpout;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

public class WordCountSpout extends BaseRichSpout {

      

       private static final long serialVersionUID = 1L;

       private SpoutOutputCollector collector;

       @Override

       public void nextTuple() {

              // 1 发射模拟数据

              collector.emit(new Values("i am ximen love jinlian"));

             

              // 2 睡眠2秒

              try {

                     Thread.sleep(2000);

              } catch (InterruptedException e) {

                     e.printStackTrace();

              }

       }

       @SuppressWarnings("rawtypes")

       @Override

       public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {

              this.collector = collector;

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("love"));

       }

}

       (2)创建切割单词的bolt

package com.atguigu.storm.wordcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt {

       private static final long serialVersionUID = 1L;

       private OutputCollector collector;

       @Override

       public void execute(Tuple input) {

              // 1 获取传递过来的一行数据

              String line = input.getString(0);

              // 2 截取

              String[] arrWords = line.split(" ");

             

              // 3 发射

              for (String word : arrWords) {

                     collector.emit(new Values(word, 1));

              }

       }

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {

              this.collector = collector;

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("word", "num"));

       }

}

       (3)创建汇总单词个数的bolt

package com.atguigu.storm.wordcount;

import java.util.HashMap;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

public class WordCountBolt extends BaseRichBolt {

       private static final long serialVersionUID = 1L;

       private Map<String, Integer> map = new HashMap<String, Integer>();

       @Override

       public void execute(Tuple input) {

              // 1 获取传递过来的数据

              String word = input.getString(0);

              Integer num = input.getInteger(1);

              // 2 累加单词

              if (map.containsKey(word)) {

                     Integer count = map.get(word);

                     map.put(word, count + num);

              } else {

                     map.put(word, num);

              }

System.err.println(Thread.currentThread().getId() + "  word:" + word + "  num:" + map.get(word));

       }

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer arg0) {

              // 不输出

       }

}

       (4)创建程序的拓扑main

package com.atguigu.storm.wordcount;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

import org.apache.storm.tuple.Fields;

public class WordCountMain {

       public static void main(String[] args) {

              // 1、准备一个TopologyBuilder

              TopologyBuilder builder = new TopologyBuilder();

              builder.setSpout("WordCountSpout", new WordCountSpout(), 1);

              builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 2).shuffleGrouping("WordCountSpout");

              builder.setBolt("WordCountBolt", new WordCountBolt(), 4).fieldsGrouping("WordCountSplitBolt", new Fields("word"));

              // 2、创建一个configuration,用来指定当前topology 需要的worker的数量

              Config conf = new Config();

              conf.setNumWorkers(2);

              // 3、提交任务 -----两种模式 本地模式和集群模式

              if (args.length > 0) {

                     try {

                            // 4 分布式提交

                            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

                     } catch (Exception e) {

                            e.printStackTrace();

                     }

              } else {

                     // 5 本地模式提交

                     LocalCluster localCluster = new LocalCluster();

                     localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());

              }

       }

}

       (5)测试

发现132线程只处理单词am和单词love;169进程处理单词i、ximen、jianlian。这就是分组的好处。

132  word:am  num:1

132  word:love  num:1

169  word:i  num:1

169  word:ximen  num:1

169  word:jinlian  num:1

5.4.2 实时计算网站PV案例

0)基础知识准备

1)需求

       统计网站pv。

2)需求分析

方案一:

定义static long pv,Synchronized 控制累计操作。(不可行)

原因:Synchronized 和 Lock在单JVM下有效,但在多JVM下无效

方案二:

shuffleGrouping下,pv * Executer并发数

驱动函数中配置如下:

builder.setSpout("PVSpout", new PVSpout(), 1);

builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");

在PVBolt1中输出时

System.err.println("threadid:" + Thread.currentThread().getId() + "  pv:" + pv*4);

因为shuffleGrouping轮询分配

优点:简单、计算量小

缺点:稍有误差,但绝大多数场景能接受

方案三:

PVBolt1进行多并发局部汇总,PVSumbolt单线程进行全局汇总

线程安全:多线程处理的结果和单线程一致

优点:绝对准确;如果用filedGrouping可以得到中间值,如单个user的访问PV(访问深度等)

缺点:计算量稍大,且多一个Bolt

3)案例实操

(1)创建数据输入源PVSpout

package com.atguigu.storm.pv;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

public class PVSpout implements IRichSpout{

       private static final long serialVersionUID = 1L;

       private SpoutOutputCollector collector ;

       private BufferedReader reader;

      

       @SuppressWarnings("rawtypes")

       @Override

       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

              this.collector = collector;

             

              try {

                     reader = new BufferedReader(new InputStreamReader(new FileInputStream("e:/website.log"),"UTF-8"));

                    

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

       @Override

       public void close() {

             

              try {

                     if (reader != null) {

                            reader.close();

                     }

              } catch (IOException e) {

                     e.printStackTrace();

              }

       }

       @Override

       public void activate() {

             

       }

       @Override

       public void deactivate() {

             

       }

       private String str;

      

       @Override

       public void nextTuple() {

             

              try {

                     while((str = reader.readLine()) != null){

                           

                            collector.emit(new Values(str));

                           

                            Thread.sleep(500);

                     }

              } catch (Exception e) {

                    

              }

       }

       @Override

       public void ack(Object msgId) {

       }

      

       @Override

       public void fail(Object msgId) {

             

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("log"));

       }

       @Override

       public Map<String, Object> getComponentConfiguration() {

              return null;

       }

}

(2)创建数据处理pvbolt1

package com.atguigu.storm.pv;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

public class PVBolt1 implements IRichBolt {

      

       private static final long serialVersionUID = 1L;

       private OutputCollector collector;

       private long pv = 0;

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

              this.collector = collector;

       }

       @Override

       public void execute(Tuple input) {

              // 获取传递过来的数据

              String logline = input.getString(0);

              // 截取出sessionid

              String session_id = logline.split(" ")[1];

              // 根据会话id不同统计pv次数

              if (session_id != null) {

                     pv++;

              }

              // 提交

              collector.emit(new Values(Thread.currentThread().getId(), pv));

              System.err.println("threadid:" + Thread.currentThread().getId() + "  pv:" + pv);

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("thireadID", "pv"));

       }

       @Override

       public void cleanup() {

       }

       @Override

       public Map<String, Object> getComponentConfiguration() {

              return null;

       }

}

(3)创建PVSumBolt

package com.atguigu.storm.pv;

import java.util.HashMap;

import java.util.Iterator;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Tuple;

public class PVSumBolt implements IRichBolt {

       private static final long serialVersionUID = 1L;

       private Map<Long, Long> counts = new HashMap<>();

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

       }

       @Override

       public void execute(Tuple input) {

              Long threadID = input.getLong(0);

              Long pv = input.getLong(1);

              counts.put(threadID, pv);

              long word_sum = 0;

              Iterator<Long> iterator = counts.values().iterator();

              while (iterator.hasNext()) {

                     word_sum += iterator.next();

              }

              System.err.println("pv_all:" + word_sum);

       }

       @Override

       public void cleanup() {

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

       }

       @Override

       public Map<String, Object> getComponentConfiguration() {

              return null;

       }

}

(4)驱动

package com.atguigu.storm.pv;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

public class PVMain {

       public static void main(String[] args) {

              TopologyBuilder builder = new TopologyBuilder();

             

              builder.setSpout("PVSpout", new PVSpout(), 1);

              builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");

              builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");

             

              Config conf = new Config();

             

              conf.setNumWorkers(2);

             

              if (args.length > 0) {

                     try {

                            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

                     } catch (Exception e) {

                            e.printStackTrace();

                     }

              }else {

                     LocalCluster cluster = new LocalCluster();

                     cluster.submitTopology("pvtopology", conf, builder.createTopology());

              }

       }

}

(5)测试,执行程序输出如下结果

threadid:161  pv:1

pv_all:1

threadid:164  pv:1

pv_all:2

threadid:161  pv:2

pv_all:3

threadid:172  pv:1

pv_all:4

threadid:164  pv:2

pv_all:5

threadid:164  pv:3

pv_all:6

threadid:162  pv:1

pv_all:7

threadid:161  pv:3

pv_all:8

threadid:172  pv:2

pv_all:9

threadid:164  pv:4

pv_all:10

threadid:162  pv:2

pv_all:11

threadid:172  pv:3

pv_all:12

threadid:164  pv:5

pv_all:13

threadid:164  pv:6

pv_all:14

threadid:161  pv:4

pv_all:15

threadid:161  pv:5

pv_all:16

threadid:164  pv:7

pv_all:17

threadid:172  pv:4

pv_all:18

threadid:172  pv:5

pv_all:19

threadid:161  pv:6

pv_all:20

threadid:162  pv:3

pv_all:21

threadid:164  pv:8

pv_all:22

threadid:172  pv:6

pv_all:23

threadid:164  pv:9

pv_all:24

threadid:161  pv:7

pv_all:25

threadid:162  pv:4

pv_all:26

threadid:162  pv:5

pv_all:27

threadid:162  pv:6

pv_all:28

threadid:164  pv:10

pv_all:29

threadid:161  pv:8

pv_all:30

5.4.3 实时计算网站UV去重案例

1)需求:

       统计网站UV。

2)需求分析

方案一:

把ip放入Set实现自动去重,Set.size() 获得UV(分布式应用中不可行)

方案二:

UVBolt1通过fieldGrouping 进行多线程局部汇总,下一级UVSumBolt进行单线程全局汇总去重。按ip地址统计UV数。

既然去重,必须持久化数据:

(1)内存:数据结构map

(2)no-sql分布式数据库,如Hbase

 

3)案例实操

       (1)创建带ip地址的数据源GenerateData

package com.atguigu.storm.uv;

import java.io.File;

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.Random;

public class GenerateData {

       public static void main(String[] args) {

              // 创建文件

              File logFile = new File("e:/website.log");

              // 判断文件是否存在

              if (!logFile.exists()) {

                     try {

                            logFile.createNewFile();

                     } catch (IOException e) {

                            e.printStackTrace();

                     }

              }

              Random random = new Random();

              // 1 网站名称

              String[] hosts = { "www.atguigu.com" };

              // 2 会话id

              String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34",

                            "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };

              // 3 访问网站时间

              String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53",

                            "2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };

              // 3 访问网站时间

              String[] ip = { "192.168.1.101", "192.168.1.102", "192.168.1.103", "192.168.1.104", "192.168.1.105",

                            "192.168.1.106", "192.168.1.107", "192.168.1.108" };

              StringBuffer sb = new StringBuffer();

              for (int i = 0; i < 30; i++) {

                     sb.append(hosts[0] + " " + session_id[random.nextInt(5)] + " " + time[random.nextInt(8)] + " "

                                   + ip[random.nextInt(8)] + " ");

              }

              // 写数据

              try {

                     FileOutputStream fs = new FileOutputStream(logFile);

                     fs.write(sb.toString().getBytes());

                     fs.close();

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

}

       (2)创建接收数据UVSpout

package com.atguigu.storm.uv;

import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.IOException;

import java.io.InputStreamReader;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;

public class UVSpout implements IRichSpout{

       private static final long serialVersionUID = 1L;

       private SpoutOutputCollector collector ;

       private BufferedReader reader;

      

       @SuppressWarnings("rawtypes")

       @Override

       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

              this.collector = collector;

             

              try {

                     reader = new BufferedReader(new InputStreamReader(new FileInputStream("e:/website.log"),"UTF-8"));

                    

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

       @Override

       public void close() {

             

              try {

                     if (reader != null) {

                            reader.close();

                     }

              } catch (IOException e) {

                     e.printStackTrace();

              }

       }

       @Override

       public void activate() {

             

       }

       @Override

       public void deactivate() {

             

       }

       private String str;

      

       @Override

       public void nextTuple() {

             

              try {

                     while((str = reader.readLine()) != null){

                           

                            collector.emit(new Values(str));

                           

                            Thread.sleep(500);

                     }

              } catch (Exception e) {

                    

              }

       }

       @Override

       public void ack(Object msgId) {

       }

      

       @Override

       public void fail(Object msgId) {

             

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("log"));

       }

       @Override

       public Map<String, Object> getComponentConfiguration() {

              return null;

       }

}

       (3)创建UVBolt1

package com.atguigu.storm.uv;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

public class UVBolt1 extends BaseRichBolt {

       private static final long serialVersionUID = 1L;

       private OutputCollector collector;

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

              this.collector = collector;

       }

       @Override

       public void execute(Tuple input) {

              // 1 获取传递过来的一行数据

              String line = input.getString(0);

             

              // 2 截取

              String[] splits = line.split(" ");

              String ip = splits[3];

             

              // 3 发射

              collector.emit(new Values(ip, 1));

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

              declarer.declare(new Fields("word", "num"));

       }

}

       (4)创建UVSumBolt

package com.atguigu.storm.uv;

import java.util.HashMap;

import java.util.Map;

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

public class UVSumBolt extends BaseRichBolt {

       private static final long serialVersionUID = 1L;

       private Map<String, Integer> map = new HashMap<String, Integer>();

      

       @SuppressWarnings("rawtypes")

       @Override

       public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

       }

       @Override

       public void execute(Tuple input) {

              // 1 获取传递过来的数据

              String ip = input.getString(0);

              Integer num = input.getInteger(1);

              // 2 累加单词

              if (map.containsKey(ip)) {

                     Integer count = map.get(ip);

                     map.put(ip, count + num);

              } else {

                     map.put(ip, num);

              }

              System.err.println(Thread.currentThread().getId() + "  ip:" + ip + "  num:" + map.get(ip));

       }

       @Override

       public void declareOutputFields(OutputFieldsDeclarer declarer) {

       }

}

       (5)创建UVMain驱动

package com.atguigu.storm.uv;

import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;

public class UVMain {

       public static void main(String[] args) {

             

              TopologyBuilder builder = new TopologyBuilder();

             

              builder.setSpout("UVSpout", new UVSpout(),1);

              builder.setBolt("UVBolt1", new UVBolt1(),4).shuffleGrouping("UVSpout");

              builder.setBolt("UVSumBolt", new UVSumBolt(), 1).shuffleGrouping("UVBolt1");

              Config conf = new Config();

              conf.setNumWorkers(2);

              if (args.length > 0) {

                     try {

                            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

                     } catch (Exception e) {

                            e.printStackTrace();

                     }

              }else {

                     LocalCluster cluster = new LocalCluster();

                    

                     cluster.submitTopology("uvtopology", conf, builder.createTopology());

              }

       }

}

       (6)测试

136  ip:192.168.1.101  num:1

136  ip:192.168.1.107  num:1

136  ip:192.168.1.108  num:1

136  ip:192.168.1.103  num:1

136  ip:192.168.1.105  num:1

136  ip:192.168.1.102  num:1

136  ip:192.168.1.103  num:2

136  ip:192.168.1.101  num:2

136  ip:192.168.1.107  num:2

136  ip:192.168.1.105  num:2

136  ip:192.168.1.101  num:3

136  ip:192.168.1.108  num:2

136  ip:192.168.1.108  num:3

136  ip:192.168.1.105  num:3

136  ip:192.168.1.101  num:4

136  ip:192.168.1.108  num:4

136  ip:192.168.1.102  num:2

136  ip:192.168.1.104  num:1

136  ip:192.168.1.103  num:3

136  ip:192.168.1.106  num:1

136  ip:192.168.1.105  num:4

136  ip:192.168.1.101  num:5

136  ip:192.168.1.102  num:3

136  ip:192.168.1.108  num:5

136  ip:192.168.1.103  num:4

136  ip:192.168.1.107  num:3

136  ip:192.168.1.102  num:4

136  ip:192.168.1.104  num:2

136  ip:192.168.1.105  num:5

136  ip:192.168.1.104  num:3

测试结果:一共8个用户,101:访问5次;102访问:4次;103:访问4次;104:访问3次;105:访问5次;106:访问1次;107:访问3次;108:访问5次;

 

 

原文地址:https://www.cnblogs.com/tesla-turing/p/13699100.html