Storm中关于Topology的设计

一:介绍Storm设计模型

1.Topology

  Storm对任务的抽象,其实 就是将实时数据分析任务 分解为 不同的阶段    

  点: 计算组件   Spout   Bolt

  边: 数据流向    数据从上一个组件流向下一个组件  带方向

2.tuple

  Storm每条记录 封装成一个tuple

  其实就是一些keyvalue对按顺序排列

  方便组件获取数据

 

3.Spout

  数据采集器

  源源不断的日志记录  如何被topology接收进行处理?

  Spout负责从数据源上获取数据,简单处理 封装成tuple向后面的bolt发射

 

4.Bolt

  数据处理器

  

二:开发wordcount案例

1.书写整个大纲的点线图

  

2..程序结构

  

3.修改pom文件

  这个地方需要注意,在集群上的时候,这时候storm的包是有的,不要再打包,所以将provided打开。

  

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5     <modelVersion>4.0.0</modelVersion>
  6 
  7     <groupId>com.cj.it</groupId>
  8     <artifactId>storm</artifactId>
  9     <version>1.0-SNAPSHOT</version>
 10 
 11     <properties>
 12         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 13         <hbase.version>0.98.6-cdh5.3.6</hbase.version>
 14         <hdfs.version>2.5.0-cdh5.3.6</hdfs.version>
 15         <storm.version>0.9.6</storm.version>
 16     </properties>
 17 
 18     <repositories>
 19         <repository>
 20             <id>cloudera</id>
 21             <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
 22         </repository>
 23         <repository>
 24             <id>alimaven</id>
 25             <name>aliyun maven</name>
 26             <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
 27         </repository>
 28     </repositories>
 29 
 30     <dependencies>
 31         <dependency>
 32             <groupId>junit</groupId>
 33             <artifactId>junit</artifactId>
 34             <version>4.12</version>
 35             <scope>test</scope>
 36         </dependency>
 37         <dependency>
 38             <groupId>org.apache.storm</groupId>
 39             <artifactId>storm-core</artifactId>
 40             <version>${storm.version}</version>
 41             <!-- IDEA执行要注释掉下面这行,打包时解开 -->
 42             <!--<scope>provided</scope>-->
 43 
 44         </dependency>
 45         <dependency>
 46             <groupId>org.apache.storm</groupId>
 47             <artifactId>storm-hbase</artifactId>
 48             <version>${storm.version}</version>
 49             <exclusions>
 50                 <exclusion>
 51                     <groupId>org.apache.hadoop</groupId>
 52                     <artifactId>hadoop-hdfs</artifactId>
 53                 </exclusion>
 54                 <exclusion>
 55                     <groupId>org.apache.hbase</groupId>
 56                     <artifactId>hbase-client</artifactId>
 57                 </exclusion>
 58             </exclusions>
 59         </dependency>
 60 
 61         <dependency>
 62             <groupId>org.apache.hadoop</groupId>
 63             <artifactId>hadoop-hdfs</artifactId>
 64             <version>${hdfs.version}</version>
 65         </dependency>
 66         <dependency>
 67             <groupId>org.apache.hbase</groupId>
 68             <artifactId>hbase-client</artifactId>
 69             <version>${hbase.version}</version>
 70             <exclusions>
 71                 <exclusion>
 72                     <artifactId>slf4j-log4j12</artifactId>
 73                     <groupId>org.slf4j</groupId>
 74                 </exclusion>
 75             </exclusions>
 76         </dependency>
 77         <dependency>
 78             <groupId>org.apache.zookeeper</groupId>
 79             <artifactId>zookeeper</artifactId>
 80             <version>3.4.6</version>
 81             <exclusions>
 82                 <exclusion>
 83                     <artifactId>slf4j-log4j12</artifactId>
 84                     <groupId>org.slf4j</groupId>
 85                 </exclusion>
 86             </exclusions>
 87         </dependency>
 88         <dependency>
 89             <groupId>org.apache.storm</groupId>
 90             <artifactId>storm-kafka</artifactId>
 91             <version>${storm.version}</version>
 92             <exclusions>
 93                 <exclusion>
 94                     <groupId>org.apache.zookeeper</groupId>
 95                     <artifactId>zookeeper</artifactId>
 96                 </exclusion>
 97             </exclusions>
 98         </dependency>
 99         <dependency>
100             <groupId>org.apache.kafka</groupId>
101             <artifactId>kafka_2.10</artifactId>
102             <version>0.8.1.1</version>
103             <exclusions>
104                 <exclusion>
105                     <groupId>org.apache.zookeeper</groupId>
106                     <artifactId>zookeeper</artifactId>
107                 </exclusion>
108                 <exclusion>
109                     <groupId>log4j</groupId>
110                     <artifactId>log4j</artifactId>
111                 </exclusion>
112             </exclusions>
113         </dependency>
114         <dependency>
115             <groupId>org.mockito</groupId>
116             <artifactId>mockito-all</artifactId>
117             <version>1.9.5</version>
118             <scope>test</scope>
119         </dependency>
120         <dependency>
121             <groupId>cz.mallat.uasparser</groupId>
122             <artifactId>uasparser</artifactId>
123             <version>0.6.1</version>
124         </dependency>
125     </dependencies>
126 
127     <build>
128         <plugins>
129             <plugin>
130                 <artifactId>maven-compiler-plugin</artifactId>
131                 <version>3.3</version>
132                 <configuration>
133                     <source>1.7</source>
134                     <target>1.7</target>
135                 </configuration>
136             </plugin>
137             <plugin>
138                 <artifactId>maven-assembly-plugin</artifactId>
139                 <version>2.4</version>
140                 <configuration>
141                     <descriptors>
142                         <descriptor>src/main/assembly/src.xml</descriptor>
143                     </descriptors>
144                     <descriptorRefs>
145                         <descriptorRef>jar-with-dependencies</descriptorRef>
146                     </descriptorRefs>
147                 </configuration>
148                 <executions>
149                     <execution>
150                         <id>make-assembly</id> <!-- this is used for inheritance merges -->
151                         <phase>package</phase> <!-- bind to the packaging phase -->
152                         <goals>
153                             <goal>single</goal>
154                         </goals>
155                     </execution>
156                 </executions>
157             </plugin>
158         </plugins>
159     </build>
160 
161 </project>

4.src.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <assembly
 3     xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
 4     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5     xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
 6     <id>jar-with-dependencies</id>
 7     <formats>
 8         <format>jar</format>
 9     </formats>
10     <includeBaseDirectory>false</includeBaseDirectory>
11     <dependencySets>
12         <dependencySet>
13             <unpack>false</unpack>
14             <scope>runtime</scope>
15         </dependencySet>
16     </dependencySets>
17     <fileSets>
18         <fileSet>
19             <directory>/lib</directory>
20         </fileSet>
21     </fileSets>
22 </assembly>

5.log

1 log4j.rootLogger=info,console
2 
3 log4j.appender.console=org.apache.log4j.ConsoleAppender
4 log4j.appender.console.layout=org.apache.log4j.SimpleLayout
5 
6 log4j.logger.com.ibeifeng=INFO

6.SentenceSpout.java

 1 package com.jun.it;
 2 
 3 import backtype.storm.spout.SpoutOutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichSpout;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.topology.base.BaseRichSpout;
 8 import backtype.storm.tuple.Fields;
 9 import backtype.storm.tuple.Values;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 
13 import java.util.Map;
14 import java.util.Random;
15 
16 
17 public class SentenceSpout extends BaseRichSpout {
18     private static final Logger logger= LoggerFactory.getLogger(SentenceSpout.class);
19     private SpoutOutputCollector collector;
20     //制造数据
21     private static final String[] SENTENCES={
22       "hadoop oozie storm hive",
23       "hadoop spark sqoop hbase",
24       "error flume yarn mapreduce"
25     };
26     //初始化collector
27     @Override
28     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
29         this.collector=spoutOutputCollector;
30     }
31     //Key的设置
32     @Override
33     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
34         outputFieldsDeclarer.declare(new Fields("sentence"));
35     }
36     //Tuple的组装
37     @Override
38     public void nextTuple() {
39         String sentence=SENTENCES[new Random().nextInt(SENTENCES.length)];
40         if(sentence.contains("error")){
41             logger.error("记录有问题"+sentence);
42         }else{
43             this.collector.emit(new Values(sentence));
44         }
45         try{
46             Thread.sleep(1000);
47         }catch (Exception e){
48             e.printStackTrace();
49         }
50     }
51 
52     public SentenceSpout() {
53         super();
54     }
55 
56     @Override
57     public void close() {
58 
59     }
60 
61     @Override
62     public void activate() {
63         super.activate();
64     }
65 
66     @Override
67     public void deactivate() {
68         super.deactivate();
69     }
70 
71     @Override
72     public void ack(Object msgId) {
73         super.ack(msgId);
74     }
75 
76     @Override
77     public void fail(Object msgId) {
78         super.fail(msgId);
79     }
80 
81 }

7.SplitBolt.java

 1 package com.jun.it;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.util.Map;
12 
13 public class SplitBolt implements IRichBolt {
14     private OutputCollector collector;
15     @Override
16     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
17         this.collector=outputCollector;
18     }
19 
20     @Override
21     public void execute(Tuple tuple) {
22         String sentence=tuple.getStringByField("sentence");
23         if(sentence!=null&&!"".equals(sentence)){
24             String[] words=sentence.split(" ");
25             for (String word:words){
26                 this.collector.emit(new Values(word));
27             }
28         }
29     }
30 
31     @Override
32     public void cleanup() {
33 
34     }
35 
36     @Override
37     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
38         outputFieldsDeclarer.declare(new Fields("word"));
39     }
40 
41     @Override
42     public Map<String, Object> getComponentConfiguration() {
43         return null;
44     }
45 }

8.CountBolt.java

 1 package com.jun.it;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Fields;
 8 import backtype.storm.tuple.Tuple;
 9 import backtype.storm.tuple.Values;
10 
11 import java.util.HashMap;
12 import java.util.Map;
13 
14 public class CountBolt implements IRichBolt {
15     private Map<String,Integer> counts;
16     private OutputCollector collector;
17     @Override
18     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
19         this.collector=outputCollector;
20         counts=new HashMap<>();
21     }
22 
23     @Override
24     public void execute(Tuple tuple) {
25         String word=tuple.getStringByField("word");
26         int count=1;
27         if(counts.containsKey(word)){
28             count=counts.get(word)+1;
29         }
30         counts.put(word,count);
31         this.collector.emit(new Values(word,count));
32     }
33 
34     @Override
35     public void cleanup() {
36 
37     }
38 
39     @Override
40     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
41         outputFieldsDeclarer.declare(new Fields("word","count"));
42     }
43 
44     @Override
45     public Map<String, Object> getComponentConfiguration() {
46         return null;
47     }
48 }

9.printBolt.java

 1 package com.jun.it;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.IRichBolt;
 6 import backtype.storm.topology.OutputFieldsDeclarer;
 7 import backtype.storm.tuple.Tuple;
 8 
 9 import java.util.Map;
10 
11 public class PrintBolt implements IRichBolt {
12     @Override
13     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
14 
15     }
16 
17     @Override
18     public void execute(Tuple tuple) {
19         String word=tuple.getStringByField("word");
20         int count=tuple.getIntegerByField("count");
21         System.out.println("word:"+word+", count:"+count);
22     }
23 
24     @Override
25     public void cleanup() {
26 
27     }
28 
29     @Override
30     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
31 
32     }
33 
34     @Override
35     public Map<String, Object> getComponentConfiguration() {
36         return null;
37     }
38 }

10.WordCountTopology.java

 1 package com.jun.it;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.topology.TopologyBuilder;
 9 import backtype.storm.tuple.Fields;
10 
11 public class WordCountTopology {
12     private static final String SENTENCE_SPOUT="sentenceSpout";
13     private static final String SPLIT_BOLT="splitBolt";
14     private static final String COUNT_BOLT="countBolt";
15     private static final String PRINT_BOLT="printBolt";
16     public static void main(String[] args){
17         TopologyBuilder topologyBuilder=new TopologyBuilder();
18         topologyBuilder.setSpout(SENTENCE_SPOUT,new SentenceSpout());
19         topologyBuilder.setBolt(SPLIT_BOLT,new SplitBolt()).shuffleGrouping(SENTENCE_SPOUT);
20         topologyBuilder.setBolt(COUNT_BOLT,new CountBolt()).fieldsGrouping(SPLIT_BOLT,new Fields("word"));
21         topologyBuilder.setBolt(PRINT_BOLT,new PrintBolt()).globalGrouping(COUNT_BOLT);
22         Config config=new Config();
23         if(args==null||args.length==0){
24             LocalCluster localCluster=new LocalCluster();
25             localCluster.submitTopology("wordcount",config,topologyBuilder.createTopology());
26         }else{
27             config.setNumWorkers(1);
28             try {
29                 StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
30             } catch (AlreadyAliveException e) {
31                 e.printStackTrace();
32             } catch (InvalidTopologyException e) {
33                 e.printStackTrace();
34             }
35         }
36 
37     }
38 }

三:本地运行

1.前提

  原本以为需要启动storm,后来发现,不需要启动Storm。

  只需要在main的时候Run即可

2.结果

  

 四:集群运行

1.在IDEA下打包

  下面的是有依赖的包。

 

2.上传到datas下

  

3.运行

   bin/storm jar /opt/datas/storm-1.0-SNAPSHOT-jar-with-dependencies.jar com.jun.it.WordCountTopology wordcount

  

4.UI效果

  

原文地址:https://www.cnblogs.com/juncaoit/p/6351492.html