Trident继承kafka

1.Kafka涉及的类

  上一个类是不透明事务

  后一个是完全事务

  

2.启动服务

  

3..驱动类

  重要的地方是修改了两个部分:

  1.数据的来源是kafka

  2.第二个是字段的Fields是str

 1 package com.jun.tridentWithKafka;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.generated.AlreadyAliveException;
 7 import backtype.storm.generated.InvalidTopologyException;
 8 import backtype.storm.spout.SchemeAsMultiScheme;
 9 import backtype.storm.tuple.Fields;
10 import backtype.storm.tuple.Values;
11 import storm.kafka.BrokerHosts;
12 import storm.kafka.StringScheme;
13 import storm.kafka.ZkHosts;
14 import storm.kafka.trident.OpaqueTridentKafkaSpout;
15 import storm.kafka.trident.TridentKafkaConfig;
16 import storm.trident.Stream;
17 import storm.trident.TridentState;
18 import storm.trident.TridentTopology;
19 import storm.trident.operation.builtin.Count;
20 import storm.trident.operation.builtin.Sum;
21 import storm.trident.testing.FixedBatchSpout;
22 import storm.trident.testing.MemoryMapState;
23 
24 public class TridentWithKafka {
25     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
26         TridentTopology tridentTopology=new TridentTopology();
27         //使用Kafka中的数据
28         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
29         String topic = "nginxlog";
30         TridentKafkaConfig conf = new TridentKafkaConfig(hosts, topic);
31 
32         conf.scheme = new SchemeAsMultiScheme(new StringScheme());
33         conf.forceFromStart = true;
34 
35         OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(conf);
36 
37         //流处理
38         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
39                 //过滤
40             .each(new Fields("str"),new ValidLogFilter())
41                 //解析
42             .each(new Fields("str"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
43                 //投影
44             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
45                 //时间解析
46             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
47          ;
48         //分流
49         //1.基于minter统计订单数量,分组统计
50         TridentState state=stream.groupBy(new Fields("minter"))
51                 //全局聚合,使用内存存储状态信息
52                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
53 //        state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
54 
55         //2.另一个流,基于分钟的订单金额,局部聚合
56         Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt"))
57             .groupBy(new Fields("minter"))
58                     //局部聚合
59                 .chainedAgg()    //聚合链
60             .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal"))
61                 .chainEnd();      //聚合链
62 //        partitionStream.each(new Fields("minter","orderAmtSumOfLocal"),new PrintFilter());
63         //做一次全局聚合
64         TridentState partitionState=partitionStream.groupBy(new Fields("minter"))
65                 //全局聚合
66                 .persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt"));
67         partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter());
68 
69         //提交
70         Config config=new Config();
71         if(args==null || args.length<=0){
72             LocalCluster localCluster=new LocalCluster();
73             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
74         }else {
75             config.setNumWorkers(2);
76             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
77         }
78     }
79 }

4.输入数据

  

5.控制台

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/9169420.html