storm的代码实现

先模拟产生一些数据

 

我把这些数据摘一部分下来

  1 2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka version : 0.9.0.1
  2 2017-06-10 18:25:56,092 [main] [org.apache.kafka.common.utils.AppInfoParser] [INFO] - Kafka commitId : 23c69d62a0cabf06
  3 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3936,"platform":"ios","timestamp":1497090356094}
  4 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":6824,"platform":"android","timestamp":1497090356194}
  5 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9389,"platform":"ios","timestamp":1497090356294}
  6 {"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":3054,"platform":"ios","timestamp":1497090356394}
  7 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1518,"platform":"android","timestamp":1497090356494}
  8 {"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":7668,"platform":"ios","timestamp":1497090356594}
  9 {"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":1665,"platform":"android","timestamp":1497090356694}
 10 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":1727,"platform":"ios","timestamp":1497090356794}
 11 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":6371,"platform":"ios","timestamp":1497090356894}
 12 {"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":495,"platform":"android","timestamp":1497090356994}
 13 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":7543,"platform":"ios","timestamp":1497090417094}
 14 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1901,"platform":"android","timestamp":1497090417194}
 15 {"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":8043,"platform":"ios","timestamp":1497090417294}
 16 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9325,"platform":"ios","timestamp":1497090417394}
 17 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":4408,"platform":"android","timestamp":1497090417494}
 18 {"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":8715,"platform":"android","timestamp":1497090417594}
 19 {"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":592,"platform":"ios","timestamp":1497090417694}
 20 {"id":"865456863256321","vid":"1495267869123450","uid":"964226522333220","gold":4319,"platform":"android","timestamp":1497090417794}
 21 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":416,"platform":"ios","timestamp":1497090417894}
 22 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":4410,"platform":"android","timestamp":1497090417994}
 23 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7197,"platform":"ios","timestamp":1497090478095}
 24 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":1737,"platform":"ios","timestamp":1497090478195}
 25 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":2425,"platform":"android","timestamp":1497090478295}
 26 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":6847,"platform":"ios","timestamp":1497090478395}
 27 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":1932,"platform":"android","timestamp":1497090478495}
 28 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":4428,"platform":"ios","timestamp":1497090478595}
 29 {"id":"865456863256320","vid":"1495267869123453","uid":"964226522333223","gold":3708,"platform":"android","timestamp":1497090478695}
 30 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":5290,"platform":"ios","timestamp":1497090478795}
 31 {"id":"865456863256328","vid":"1495267869123452","uid":"964226522333222","gold":5080,"platform":"android","timestamp":1497090478895}
 32 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":9643,"platform":"android","timestamp":1497090478995}
 33 {"id":"865456863256324","vid":"1495267869123452","uid":"964226522333222","gold":3766,"platform":"ios","timestamp":1497090539095}
 34 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":3758,"platform":"android","timestamp":1497090539195}
 35 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":2522,"platform":"android","timestamp":1497090539295}
 36 {"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":8746,"platform":"android","timestamp":1497090539395}
 37 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":7616,"platform":"ios","timestamp":1497090539495}
 38 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":527,"platform":"android","timestamp":1497090539595}
 39 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":3887,"platform":"ios","timestamp":1497090539695}
 40 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":2137,"platform":"ios","timestamp":1497090539795}
 41 {"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":6965,"platform":"android","timestamp":1497090539895}
 42 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":350,"platform":"android","timestamp":1497090539995}
 43 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":863,"platform":"android","timestamp":1497090600096}
 44 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":9597,"platform":"ios","timestamp":1497090600196}
 45 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":9504,"platform":"ios","timestamp":1497090600296}
 46 {"id":"865456863256322","vid":"1495267869123451","uid":"964226522333221","gold":1598,"platform":"ios","timestamp":1497090600396}
 47 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":1126,"platform":"android","timestamp":1497090600496}
 48 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":3606,"platform":"android","timestamp":1497090600596}
 49 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":1866,"platform":"ios","timestamp":1497090600696}
 50 {"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":1282,"platform":"android","timestamp":1497090600796}
 51 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":542,"platform":"ios","timestamp":1497090600896}
 52 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":4168,"platform":"android","timestamp":1497090600996}
 53 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4766,"platform":"android","timestamp":1497090661096}
 54 {"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":3867,"platform":"ios","timestamp":1497090661196}
 55 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7825,"platform":"ios","timestamp":1497090661296}
 56 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":4518,"platform":"ios","timestamp":1497090661396}
 57 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":4280,"platform":"ios","timestamp":1497090661496}
 58 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":4909,"platform":"android","timestamp":1497090661596}
 59 {"id":"865456863256325","vid":"1495267869123452","uid":"964226522333222","gold":7227,"platform":"ios","timestamp":1497090661696}
 60 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":9937,"platform":"android","timestamp":1497090661796}
 61 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":7840,"platform":"ios","timestamp":1497090661896}
 62 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":2762,"platform":"ios","timestamp":1497090661996}
 63 {"id":"865456863256322","vid":"1495267869123454","uid":"964226522333224","gold":7941,"platform":"ios","timestamp":1497090722097}
 64 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":6188,"platform":"android","timestamp":1497090722197}
 65 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":2387,"platform":"android","timestamp":1497090722297}
 66 {"id":"865456863256322","vid":"1495267869123450","uid":"964226522333220","gold":2980,"platform":"ios","timestamp":1497090722397}
 67 {"id":"865456863256321","vid":"1495267869123452","uid":"964226522333222","gold":9403,"platform":"android","timestamp":1497090722497}
 68 {"id":"865456863256323","vid":"1495267869123453","uid":"964226522333223","gold":3482,"platform":"android","timestamp":1497090722597}
 69 {"id":"865456863256324","vid":"1495267869123454","uid":"964226522333224","gold":3290,"platform":"android","timestamp":1497090722697}
 70 {"id":"865456863256323","vid":"1495267869123454","uid":"964226522333224","gold":1439,"platform":"android","timestamp":1497090722797}
 71 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":6758,"platform":"ios","timestamp":1497090722897}
 72 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":3501,"platform":"ios","timestamp":1497090722997}
 73 {"id":"865456863256325","vid":"1495267869123454","uid":"964226522333224","gold":7904,"platform":"ios","timestamp":1497090783097}
 74 {"id":"865456863256326","vid":"1495267869123453","uid":"964226522333223","gold":9900,"platform":"android","timestamp":1497090783197}
 75 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":1841,"platform":"ios","timestamp":1497090783297}
 76 {"id":"865456863256322","vid":"1495267869123453","uid":"964226522333223","gold":8857,"platform":"ios","timestamp":1497090783397}
 77 {"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7855,"platform":"android","timestamp":1497090783497}
 78 {"id":"865456863256324","vid":"1495267869123451","uid":"964226522333221","gold":7165,"platform":"android","timestamp":1497090783597}
 79 {"id":"865456863256326","vid":"1495267869123450","uid":"964226522333220","gold":2247,"platform":"ios","timestamp":1497090783697}
 80 {"id":"865456863256329","vid":"1495267869123454","uid":"964226522333224","gold":1742,"platform":"android","timestamp":1497090783797}
 81 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9122,"platform":"ios","timestamp":1497090783897}
 82 {"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":1623,"platform":"android","timestamp":1497090783997}
 83 {"id":"865456863256324","vid":"1495267869123450","uid":"964226522333220","gold":8354,"platform":"ios","timestamp":1497090844098}
 84 {"id":"865456863256321","vid":"1495267869123454","uid":"964226522333224","gold":3808,"platform":"ios","timestamp":1497090844198}
 85 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":9875,"platform":"android","timestamp":1497090844298}
 86 {"id":"865456863256327","vid":"1495267869123452","uid":"964226522333222","gold":2714,"platform":"ios","timestamp":1497090844398}
 87 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":3660,"platform":"ios","timestamp":1497090844498}
 88 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":8545,"platform":"ios","timestamp":1497090844598}
 89 {"id":"865456863256325","vid":"1495267869123453","uid":"964226522333223","gold":5757,"platform":"android","timestamp":1497090844698}
 90 {"id":"865456863256320","vid":"1495267869123450","uid":"964226522333220","gold":7898,"platform":"android","timestamp":1497090844798}
 91 {"id":"865456863256329","vid":"1495267869123453","uid":"964226522333223","gold":3633,"platform":"ios","timestamp":1497090844898}
 92 {"id":"865456863256329","vid":"1495267869123452","uid":"964226522333222","gold":6500,"platform":"android","timestamp":1497090844998}
 93 {"id":"865456863256323","vid":"1495267869123450","uid":"964226522333220","gold":8859,"platform":"ios","timestamp":1497090905098}
 94 {"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":3897,"platform":"android","timestamp":1497090905198}
 95 {"id":"865456863256326","vid":"1495267869123451","uid":"964226522333221","gold":5786,"platform":"ios","timestamp":1497090905298}
 96 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2667,"platform":"android","timestamp":1497090905398}
 97 {"id":"865456863256321","vid":"1495267869123453","uid":"964226522333223","gold":4038,"platform":"android","timestamp":1497090905499}
 98 {"id":"865456863256328","vid":"1495267869123451","uid":"964226522333221","gold":361,"platform":"android","timestamp":1497090905599}
 99 {"id":"865456863256326","vid":"1495267869123454","uid":"964226522333224","gold":7074,"platform":"android","timestamp":1497090905699}
100 {"id":"865456863256323","vid":"1495267869123451","uid":"964226522333221","gold":89,"platform":"android","timestamp":1497090905799}
101 {"id":"865456863256325","vid":"1495267869123450","uid":"964226522333220","gold":1354,"platform":"android","timestamp":1497090905899}
102 {"id":"865456863256326","vid":"1495267869123452","uid":"964226522333222","gold":221,"platform":"ios","timestamp":1497090905999}
103 {"id":"865456863256325","vid":"1495267869123451","uid":"964226522333221","gold":436,"platform":"android","timestamp":1497090966099}
104 {"id":"865456863256327","vid":"1495267869123451","uid":"964226522333221","gold":8000,"platform":"android","timestamp":1497090966199}
105 {"id":"865456863256324","vid":"1495267869123453","uid":"964226522333223","gold":9952,"platform":"android","timestamp":1497090966299}
106 {"id":"865456863256321","vid":"1495267869123451","uid":"964226522333221","gold":2216,"platform":"android","timestamp":1497090966400}
107 {"id":"865456863256320","vid":"1495267869123452","uid":"964226522333222","gold":2042,"platform":"android","timestamp":1497090966500}
108 {"id":"865456863256329","vid":"1495267869123451","uid":"964226522333221","gold":8739,"platform":"ios","timestamp":1497090966600}
109 {"id":"865456863256322","vid":"1495267869123452","uid":"964226522333222","gold":2500,"platform":"ios","timestamp":1497090966701}
110 {"id":"865456863256323","vid":"1495267869123452","uid":"964226522333222","gold":9803,"platform":"ios","timestamp":1497090966801}
111 {"id":"865456863256328","vid":"1495267869123450","uid":"964226522333220","gold":7246,"platform":"android","timestamp":1497090966901}
112 {"id":"865456863256320","vid":"1495267869123454","uid":"964226522333224","gold":5220,"platform":"android","timestamp":1497090967001}

 参考代码KafkaProducer.java

 1 package yehua.kafkaDemo;
 2 
 3 import java.util.Properties;
 4 import java.util.Random;
 5 
 6 import org.apache.kafka.clients.producer.Producer;
 7 import org.apache.kafka.clients.producer.ProducerRecord;
 8 
 9 public class KafkaProducer {
10     
11     public static void main(String[] args) throws Exception {
12         Properties props = new Properties();  
13         props.put("bootstrap.servers", "master:9092");  
14         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
15         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
16         //String topic = "gold_log_r2p5";
17         String topic = "test";
18         
19         Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);  
20         int count = 0;
21         //{"id":"865456863256326","vid":"1495267869123456","uid":"965406863256326","gold":150,"platform":"ios","timestamp":1495267869}
22         //模拟送礼人id
23         String[] idArr = {"865456863256320","865456863256321","865456863256322","865456863256323","865456863256324","865456863256325","865456863256326","865456863256327","865456863256328","865456863256329"};
24         //模拟直播间视频id
25         String[] vidArr = {"1495267869123450","1495267869123451","1495267869123452","1495267869123453","1495267869123454"};
26         //模拟直播用户id
27         String[] uidArr = {"964226522333220","964226522333221","964226522333222","964226522333223","964226522333224"};
28         //模拟用户手机平台
29         String[] platformArr = {"android","ios"};
30         Random random = new Random();
31         while(true){
32             int rint1 = random.nextInt(10);
33             int rint2 = random.nextInt(5);
34             int rint3 = random.nextInt(2);
35             String log = "{"id":""+idArr[rint1]+"","vid":""+vidArr[rint2]+"","uid":""+uidArr[rint2]+"","gold":"+random.nextInt(10000)+","platform":""+platformArr[rint3]+"","timestamp":"+System.currentTimeMillis()+"}";
36             //producer.send(new ProducerRecord<String, String>(topic, log));  
37             System.out.println(log);
38             count++;
39             Thread.sleep(100);
40             if(count%10 == 0){
41                 //break;
42                 Thread.sleep(1000*60);
43             }
44         }
45     }
46 
47 }
 

 先在kafka创建topic

两个副本5个分区

 

可以看到topic创建成功

总结一下前面的流程

下面再新建一个maven项目stormpProject0521

依赖文件:

 依赖文件pom.xml参考代码

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>yehua</groupId>
  <artifactId>stormpProject0521</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>stormpProject0521</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <!-- 这个依赖只在编译时有用,运行时就不需要了,因为storm集群中有 -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			<version>1.0.2</version>
			<!-- 表示这个依赖只在编译代码的时候使用,打包的时候不用 -->
			<scope>provided</scope>
		</dependency>
		<!-- 主要为了使用kafkaspout -->
		<dependency>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-kafka</artifactId>
			<version>1.0.2</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.9.0.1</version>
			<!-- 过滤掉 slf4j-log4j12-->
			<exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
	      			<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<!-- 注意,kafka0.9开始需要在这里配置kafka client依赖,否则会报错 java.lang.NoSuchMethodError: 
			org.apache.kafka.common.network.NetworkSend.<init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.9.0.1</version>
		</dependency>
		<dependency>
			<groupId>commons-collections</groupId>
			<artifactId>commons-collections</artifactId>
			<version>3.1</version>
		</dependency>
		<!-- dbunits -->
		<dependency>
			<groupId>commons-dbutils</groupId>
			<artifactId>commons-dbutils</artifactId>
			<version>1.6</version>
		</dependency>
		<!-- mysql -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.29</version>
		</dependency>
		<!-- json -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.10</version>
		</dependency>
		<!-- redis -->
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.9.0</version>
		</dependency>
  </dependencies>
  <build>
		<plugins>
			<!-- compiler插件, 设定JDK版本 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<encoding>UTF-8</encoding>
					<source>1.8</source>
					<target>1.8</target>
					<showWarnings>true</showWarnings>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

  

参考代码LogProcessTopology.java

 1 package yehua.stormpProject0521;
 2 
 3 import org.apache.storm.Config;
 4 import org.apache.storm.LocalCluster;
 5 import org.apache.storm.StormSubmitter;
 6 import org.apache.storm.generated.AlreadyAliveException;
 7 import org.apache.storm.generated.AuthorizationException;
 8 import org.apache.storm.generated.InvalidTopologyException;
 9 import org.apache.storm.generated.StormTopology;
10 import org.apache.storm.kafka.BrokerHosts;
11 import org.apache.storm.kafka.KafkaSpout;
12 import org.apache.storm.kafka.SpoutConfig;
13 import org.apache.storm.kafka.StringScheme;
14 import org.apache.storm.kafka.ZkHosts;
15 import org.apache.storm.spout.SchemeAsMultiScheme;
16 import org.apache.storm.topology.TopologyBuilder;
17 
18 import yehua.stormpProject0521.bolt.LogProcessBolt1;
19 import yehua.stormpProject0521.bolt.LogProcessBolt2;
20 import yehua.stormpProject0521.bolt.ParseLogBolt;
21 
22 public class LogProcessTopology {
23     
24     public static void main(String[] args) {
25         TopologyBuilder topologyBuilder = new TopologyBuilder();
26         String topology_name = LogProcessTopology.class.getSimpleName();
27         String SPOUT_ID = KafkaSpout.class.getSimpleName();
28         String BOLT_ID_1 = ParseLogBolt.class.getSimpleName();
29         String BOLT_ID_2 = LogProcessBolt1.class.getSimpleName();
30         String BOLT_ID_3 = LogProcessBolt2.class.getSimpleName();
31         
32         
33         
34         
35         BrokerHosts hosts = new ZkHosts("hadoop100:2181");//设置zk地址,为了找到kafka
36         String topic = "gold_log_r2p5";//topic
37         String zkRoot = "/kafkaSpout";//storm会通过kafkaspout消费kafka中的数据,具体消费的offset信息会保存到这个节点下面
38         String id = "consumer_gold_log";//可以理解为groupid
39         SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id);
40         //表示吧spout输出的数据使用字符串进行解析,这样在bolt中取数据的时候,就可以之间获取字符串了
41         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
42         //注意:第一次消费数据的话,默认会从topic的最早的数据进行消费
43         //storm通过kafkaspout消费topic里面数据的时候,如果zkRoot中没有保存消费的offset,那么久会根据startOffsetTime的值来消费topic中的数据
44         //spoutConf.startOffsetTime =  kafka.api.OffsetRequest.EarliestTime();/最早的数据
45         spoutConf.startOffsetTime =  kafka.api.OffsetRequest.LatestTime();//最新的数据
46         
47         
48         
49         topologyBuilder.setSpout(SPOUT_ID,new KafkaSpout(spoutConf),5);
50         //可以实现多个kafkaspout
51         //topologyBuilder.setSpout("newSpout",new KafkaSpout(spoutConfNew),5);
52         
53         topologyBuilder.setBolt(BOLT_ID_1, new ParseLogBolt(),2).setNumTasks(6).shuffleGrouping(SPOUT_ID);
54         //LogProcessBolt1  这个bolt只能使用一个线程执行  globalGrouping可以保证数据只让一个线程去处理
55         topologyBuilder.setBolt(BOLT_ID_2, new LogProcessBolt1()).globalGrouping(BOLT_ID_1);
56         topologyBuilder.setBolt(BOLT_ID_3, new LogProcessBolt2(),2).shuffleGrouping(BOLT_ID_1);
57         
58         Config config = new Config();
59         //config.setNumWorkers(2);//使用两个worker
60         config.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx1024m");//给worker指定内存
61         config.setMaxSpoutPending(1000);//限制内存中未处理的tuple个数最多为1000
62         StormTopology createTopology = topologyBuilder.createTopology();
63         
64         if(args.length==0){
65             LocalCluster localCluster = new LocalCluster();
66             localCluster.submitTopology(topology_name, config, createTopology);
67         }else{
68             try {
69                 StormSubmitter.submitTopology(topology_name, config, createTopology);
70             } catch (AlreadyAliveException e) {
71                 e.printStackTrace();
72             } catch (InvalidTopologyException e) {
73                 e.printStackTrace();
74             } catch (AuthorizationException e) {
75                 e.printStackTrace();
76             }
77         }
78         
79     }
80 
81 }

参考代码ParseLogBolt.java

 1 package yehua.stormpProject0521.bolt;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.storm.Config;
 7 import org.apache.storm.Constants;
 8 import org.apache.storm.task.OutputCollector;
 9 import org.apache.storm.task.TopologyContext;
10 import org.apache.storm.topology.OutputFieldsDeclarer;
11 import org.apache.storm.topology.base.BaseRichBolt;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Tuple;
14 import org.apache.storm.tuple.Values;
15 
16 import com.alibaba.fastjson.JSON;
17 import com.alibaba.fastjson.JSONObject;
18 
19 /**
20  * 主要对数据进行解析,把关键字段解析出来,发射出去
21  * @author yehua
22  *
23  */
24 public class ParseLogBolt extends BaseRichBolt {
25     private Map stormConf;
26     private TopologyContext context;
27     private OutputCollector collector;
28     private Map<String, String> idCountryMap;
29     @Override
30     public void prepare(Map stormConf, TopologyContext context,
31             OutputCollector collector) {
32         this.stormConf = stormConf;
33         this.context = context;
34         this.collector = collector;
35         //在初始化的时候从redis中把送礼人id和省份信息加载过来,后期在storm的定时任务中每半个小时同步一次,把新注册用户的信息拉取过来
36         /*RedisUtils redisUtils = new RedisUtils();
37         List<String> list = redisUtils.lrange("all_id_province", 0, -1);
38         for (String id_country : list) {
39             String[] splits = id_country.split("	");
40             idCountryMap.put(splits[0], splits[1]);
41         }
42         redisUtils.close();*/
43         idCountryMap = new HashMap<String, String>();
44         idCountryMap.put("865456863256320", "京");
45         idCountryMap.put("865456863256321", "津");
46         idCountryMap.put("865456863256322", "冀");
47         idCountryMap.put("865456863256323", "晋");
48         idCountryMap.put("865456863256324", "辽");
49         idCountryMap.put("865456863256325", "黑");
50         idCountryMap.put("865456863256326", "沪");
51         idCountryMap.put("865456863256327", "苏");
52         idCountryMap.put("865456863256328", "浙");
53         idCountryMap.put("865456863256329", "皖");
54     }
55 
56     @Override
57     public void execute(Tuple input) {
58         if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
59             //执行定时同步用户静态信息的代码
60             //定时向idCountryMap中更新数据,每次更新只需要把新增的数据读取过来即可,属于增量读取
61             /*RedisUtils redisUtils = new RedisUtils();
62             String poll = redisUtils.poll("new_id_country");
63             while(poll!=null){
64                 String[] splits = poll.split("	");
65                 idCountryMap.put(splits[0], splits[1]);
66                 poll = redisUtils.poll("new_id_country");
67             }*/
68         }else{
69             try {
70                 //String log = new String(input.getBinaryByField("bytes"));
71                 String log = input.getStringByField("str");
72                 JSONObject logObj = JSON.parseObject(log);
73                 String id = logObj.getString("id");
74                 String province = idCountryMap.getOrDefault(id, "其它");//用户省份信息
75                 Integer gold = logObj.getInteger("gold");//金币
76                 this.collector.emit(new Values(province,gold));
77                 this.collector.ack(input);
78             } catch (Exception e) {
79                 this.collector.fail(input);
80             }
81         }
82     }
83 
84     @Override
85     public void declareOutputFields(OutputFieldsDeclarer declarer) {
86         declarer.declare(new Fields("province","gold"));
87     }
88 
89     @Override
90     public Map<String, Object> getComponentConfiguration() {
91         HashMap<String, Object> hashMap = new HashMap<String, Object>();
92         hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*30);
93         return hashMap;
94     }
95     
96     
97 
98 }

参考代码LogProcessBolt1.java

 1 package yehua.stormpProject0521.bolt;
 2 
 3 import java.sql.Connection;
 4 import java.sql.SQLException;
 5 import java.sql.Statement;
 6 import java.util.Date;
 7 import java.util.HashMap;
 8 import java.util.Map;
 9 
10 import org.apache.storm.Config;
11 import org.apache.storm.Constants;
12 import org.apache.storm.task.OutputCollector;
13 import org.apache.storm.task.TopologyContext;
14 import org.apache.storm.topology.OutputFieldsDeclarer;
15 import org.apache.storm.topology.base.BaseRichBolt;
16 import org.apache.storm.trident.operation.builtin.Sum;
17 import org.apache.storm.tuple.Tuple;
18 
19 import yehua.stormpProject0521.utils.MyDateUtils;
20 import yehua.stormpProject0521.utils.MyDbUtils;
21 
22 /**
23  * 统计一下全网金币消耗数据(2分钟)(折线图)
24  * 每隔两分钟统计一下全网金币消耗数据(2分钟)(折线图)
25  * 1    1526    2017-01-01 00:00:00
26  * 2    2560    2017-01-01 00:02:00
27  * 3    1560    2017-01-01 00:04:00
28  * 4    1960    2017-01-01 00:06:00
29  * @author yehua
30  *
31  */
32 public class LogProcessBolt1 extends BaseRichBolt {
33 
34     @Override
35     public void prepare(Map stormConf, TopologyContext context,
36             OutputCollector collector) {
37         
38     }
39     int sum = 0;
40     private Connection connection = null;
41     @Override
42     public void execute(Tuple input) {
43         if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
44             //定时任务
45             try {
46                 String curr_time = MyDateUtils.formatDate2(new Date());
47                 connection = MyDbUtils.getConnection();
48                 Statement state = connection.createStatement();
49                 state.executeUpdate("insert into result1 (gold,time) values("+sum+",'"+curr_time+"')");
50                 System.out.println("入库成功!");
51                 sum = 0;//注意,需要把sum重置为0
52             } catch (SQLException e) {
53                 System.out.println("执行错误!");
54             }finally{
55                 if(connection!=null){
56                     try {
57                         connection.close();
58                     } catch (SQLException e) {
59                         e.printStackTrace();
60                     }
61                 }
62             }
63         }else{
64             Integer gold = input.getIntegerByField("gold");
65             sum+=gold;
66         }
67         
68     }
69 
70     @Override
71     public void declareOutputFields(OutputFieldsDeclarer declarer) {
72         
73     }
74     
75     @Override
76     public Map<String, Object> getComponentConfiguration() {
77         HashMap<String, Object> hashMap = new HashMap<String, Object>();
78         hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60*2);
79         return hashMap;
80     }
81     
82 
83 }

参考代码LogProcessBolt2.java

  1 package yehua.stormpProject0521.bolt;
  2 
  3 import java.sql.Connection;
  4 import java.sql.ResultSet;
  5 import java.sql.SQLException;
  6 import java.sql.Statement;
  7 import java.util.Date;
  8 import java.util.HashMap;
  9 import java.util.Map;
 10 import java.util.Map.Entry;
 11 
 12 import org.apache.commons.collections.MapUtils;
 13 import org.apache.storm.Config;
 14 import org.apache.storm.Constants;
 15 import org.apache.storm.task.OutputCollector;
 16 import org.apache.storm.task.TopologyContext;
 17 import org.apache.storm.topology.OutputFieldsDeclarer;
 18 import org.apache.storm.topology.base.BaseRichBolt;
 19 import org.apache.storm.tuple.Tuple;
 20 
 21 import yehua.stormpProject0521.utils.DistributedLock;
 22 import yehua.stormpProject0521.utils.MyDateUtils;
 23 import yehua.stormpProject0521.utils.MyDbUtils;
 24 
 25 /**
 26  * 统计不同省份的金币消耗数据(1分钟)(柱状图)
 27  * 1    京    9200    2017-01-01
 28  * 2    津    5508    2017-01-01
 29  * 3    京    8562    2017-01-02
 30  * 4    津    4586    2017-01-02
 31  * 5    京    8954    2017-01-03
 32  * 6    津    2563    2017-01-03
 33  * 
 34  * @author yehua
 35  *
 36  */
 37 public class LogProcessBolt2 extends BaseRichBolt {
 38     private DistributedLock lock;
 39     @Override
 40     public void prepare(Map stormConf, TopologyContext context,
 41             OutputCollector collector) {
 42         this.lock = new DistributedLock("hadoop100:2181","test");
 43     }
 44     private Map<String, Integer> province_gold_map = new HashMap<String, Integer>();
 45     private Connection connection = null;
 46     @Override
 47     public void execute(Tuple input) {
 48         if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
 49             //定时任务
 50             String curr_time = MyDateUtils.formatDate4(new Date());
 51             try {
 52                 connection = MyDbUtils.getConnection();
 53                 Statement state = connection.createStatement();
 54                 lock.lock();//上锁
 55                 for (Entry<String, Integer> entry : province_gold_map.entrySet()) {
 56                     String province = entry.getKey();
 57                     Integer gold = entry.getValue();
 58                     // 入库之前,需要先查询一下,如果有数据,则执行更新操作,如果没有,则插入
 59                     state.execute("select id,province,gold from result2 where province = '"+province+"' and time = '"+curr_time+"'");
 60                     ResultSet resultSet = state.getResultSet();
 61                     if(resultSet.next()){//有数据
 62                         int id = resultSet.getInt(1);
 63                         int count = resultSet.getInt(3);
 64                         count+=gold;
 65                         state.executeUpdate("update result2 set gold = "+count+" where id = "+id);
 66                     }else{
 67                         state.executeUpdate("insert into result2(province,gold,time) values('"+province+"',"+gold+",'"+curr_time+"')");
 68                     }
 69                 }
 70                 System.out.println("执行入库成功: "+province_gold_map.size());
 71                 province_gold_map.clear();//注意,一定要把临时结果清空
 72             } catch (SQLException e) {
 73                 e.printStackTrace();
 74             }finally{
 75                 if(connection!=null){
 76                     try {
 77                         connection.close();
 78                     } catch (SQLException e) {
 79                         e.printStackTrace();
 80                     }
 81                 }
 82                 lock.unlock();//释放锁
 83             }
 84             
 85         }else{
 86             String province = input.getStringByField("province");//省份信息
 87             Integer gold = input.getIntegerByField("gold");
 88             province_gold_map.put(province, MapUtils.getInteger(province_gold_map, province, 0)+gold);
 89         }
 90         
 91     }
 92 
 93     @Override
 94     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 95         
 96     }
 97     
 98     @Override
 99     public Map<String, Object> getComponentConfiguration() {
100         HashMap<String, Object> hashMap = new HashMap<String, Object>();
101         hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
102         return hashMap;
103     }
104 
105     @Override
106     public void cleanup() {
107         this.lock.closeZk();//关闭分布式共享锁使用的zk链接
108     }
109     
110     
111 
112 }
原文地址:https://www.cnblogs.com/braveym/p/6973037.html