22 友盟项目--sparkstreaming对接kafka、集成redis--从redis中存储用户使用app的最小时间戳min , 最大时间戳max

 实时存储到redis,更新用户使用时间的最大、最小值--》来求留存率  避免全表扫描问题
1.spark 对接kafka 消费者   解析json  
2.concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid) ---> 作为key  各个维度
3.选出用户使用app的最小时间戳min , 最大时间戳max  -->作为value
4.存储到redis
 
依赖
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.17</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

RTDataCleanning 

  1 package com.oldboy.umeng.spark.kafka;
  2 
  3 
  4 
  5 /**
  6  * sparkstreaming对接kafka  数据存储到redis
  7  */
  8 public class RTDataCleanning {
  9     public static void main(String[] args) throws Exception {
 10         SparkConf conf = new SparkConf() ;
 11         conf.setAppName("kafka") ;
 12         conf.setMaster("local[8]") ;
 13 
 14         // 先创建SparkSession
 15         final SparkSession spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() ;
 16         ExecSQLUtil.execRegisterFuncs(spark);
 17 
 18         //创建java streaming上下文
 19         JavaStreamingContext ssc = new JavaStreamingContext(new JavaSparkContext(spark.sparkContext()) , Durations.seconds(2)) ;
 20 
 21 
 22 
 23         //kafka参数
 24         Map<String,Object> kafkaParams = new HashMap<String, Object>();
 25         kafkaParams.put("bootstrap.servers" , "s102:9092,s103:9092") ;
 26         kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
 27         kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
 28         kafkaParams.put("auto.offset.reset" , "latest") ;
 29         kafkaParams.put("group.id" , "gssss") ;
 30         kafkaParams.put("enable.auto.commit" ,"true") ;
 31 
 32 
 33         //位置策略 , 控制消费者在哪个主机上启动
 34         //消费者策略 , 控制消费哪个主题,哪个分区,哪个偏移量
 35         LocationStrategy ls = LocationStrategies.PreferConsistent() ;
 36 
 37         List<TopicPartition> tps = new ArrayList<TopicPartition>( ) ;
 38         tps.add(new TopicPartition("big12-umeng-raw-logs" , 0)) ;
 39         tps.add(new TopicPartition("big12-umeng-raw-logs" , 1)) ;
 40         tps.add(new TopicPartition("big12-umeng-raw-logs" , 2)) ;
 41         tps.add(new TopicPartition("big12-umeng-raw-logs" , 3)) ;
 42 
 43         //消息者策略
 44         ConsumerStrategy cs = ConsumerStrategies.Assign(tps , kafkaParams) ;
 45 
 46         //kafka消息流
 47         JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.createDirectStream(ssc , ls ,cs) ;
 48 
 49         //提取到日志串#.#.#.#.  kafka消费代码
 50         JavaDStream<Row> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, Row>() {
 51             public Row call(ConsumerRecord<String, String> v1) throws Exception {
 52                 String topic = v1.topic() ;//主题
 53                 int par = v1.partition() ;//分区
 54                 long offset = v1.offset() ;//偏移量
 55                 String value = v1.value();//值 -- 一行日志
 56                 String[] arr = value.split("#");//日志进行切割  返回元素值
 57                 //行 工厂
 58                 return RowFactory.create(
 59                         Float.parseFloat(arr[0]),
 60                         arr[1],                    //服务器时间
 61                         arr[2],                    //客户端ip
 62                         Long.parseLong(arr[3]),    //客户端时间
 63                         Integer.parseInt(arr[4]),//状态码
 64                         arr[5]) ;                //得到json
 65             }
 66         }) ;
 67 
 68         //处理每个rdd
 69         ds2.foreachRDD(new VoidFunction<JavaRDD<Row>>() {
 70             public void call(JavaRDD<Row> rdd) throws Exception {
 71                 SparkSession spark = SparkSession.builder()
 72                                              .config(rdd.context().getConf())
 73                                              .enableHiveSupport()
 74                                              .getOrCreate();
 75                 //结构化字段--通过StructType直接指定Schema  表
 76                 StructField[] fields = new StructField[6];
 77                 fields[0] = new StructField("servertimems", DataTypes.FloatType, false, Metadata.empty());
 78                 fields[1] = new StructField("servertimestr", DataTypes.StringType, false, Metadata.empty());
 79                 fields[2] = new StructField("clientip", DataTypes.StringType, false, Metadata.empty());
 80                 fields[3] = new StructField("clienttimems", DataTypes.LongType, false, Metadata.empty());
 81                 fields[4] = new StructField("status", DataTypes.IntegerType, false, Metadata.empty());
 82                 fields[5] = new StructField("log", DataTypes.StringType, false, Metadata.empty());
 83                 StructType type = new StructType(fields);
 84 
 85                 //过滤无效数据
 86                 Dataset<Row> df1 = spark.createDataFrame(rdd, type);
 87                 //创建临时表 servertimems  servertimestr clientip clienttimems status log 相当于raw_log原生表
 88                 df1.createOrReplaceTempView("_temp");//相当于raw_log原生表
 89                 Dataset<Row> df2 = spark.sql("select forkstartuplogs(servertimestr , clienttimems , clientip , log) from _temp");
 90                 df2.createOrReplaceTempView("_temp2");//相当于appstartuplogs启动日志表
 91 
 92                 String aggSql = "select concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid) key," +
 93                                         "min(createdatms) mn," +
 94                                         "max(createdatms) mx  from _temp2 group by " +
 95                                         "concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid)" ;
 96                 //在sql语句中聚合rdd内的最值
 97                 ///优化:这里为什么不用foreach   foreach是对每个元素做处理,foreachpartition是对每一个分区做处理--》得到的是迭代器
 98                 ///////每个分区内开启一次redis,提高性能
 99                 spark.sql(aggSql).foreachPartition(new ForeachPartitionFunction<Row>() {
100                     public void call(Iterator<Row> t) throws Exception {
101                         //创建redis实例
102                         Jedis redis = new Jedis("s101", 6379);
103                         redis.select(1);//redis选择 库
104 
105                         //redis存储的createdatms日志创建时间min  createdatms日志创建的时间max
106                         while(t.hasNext()){
107                             Row row = t.next() ;
108                             String key = row.getAs("key") ;
109                             long mn = row.getAs("mn") ;
110                             long mx = row.getAs("mx") ;
111 
112                             String oldvalue = redis.get(key);
113                             if (oldvalue == null) {
114                                 redis.set(key, mn + "," + mx);
115                             } else {
116                                 String[] arr = oldvalue.split(",");
117                                 long oldMin = Long.parseLong(arr[0]);
118                                 long oldMax = Long.parseLong(arr[1]);
119                                 redis.set(key, Math.min(mn, oldMin) + "," + Math.max(mx, oldMax));
120                             }
121                         }
122                         redis.close();
123                     }
124                 });
125             }
126         });
127 
128         ssc.start();
129         ssc.awaitTermination();
130     }
131 }

redis存储工具类

 1 package com.oldboy.umeng.spark.stat;
 2 
 3 import redis.clients.jedis.Jedis;
 4 
 5 import java.util.ArrayList;
 6 import java.util.List;
 7 import java.util.Set;
 8 
 9 /**
10  * redis工具类
11  */
12 public class RedisUtil {
13     private static Jedis redis = new Jedis("s101" , 6379) ;
14 
15     static{
16         //选择使用2号库
17         redis.select(1) ;
18     }
19 
20     /**
21      * 更新redis数据库
22      */
23     public static void updateKey(String key ,long min, long max){
24         String oldvalue = redis.get(key) ;
25         if(oldvalue == null){
26             redis.set(key , min + "," + max) ;
27         }
28         else{
29             String[] arr = oldvalue.split(",") ;
30             long oldMin = Long.parseLong(arr[0]) ;
31             long oldMax = Long.parseLong(arr[1]) ;
32             redis.set(key , Math.min(min,oldMin) + "," + Math.max(max,oldMax)) ;
33         }
34     }
35 
36     public static List<String> keys(){
37         Set<String> keys = redis.keys("*") ;
38         return new ArrayList<String>(keys) ;
39     }
40 }
 
 
 
 
 
 
 
 
原文地址:https://www.cnblogs.com/star521/p/9961540.html