UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

 

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

 
测试数据
java代码
 1 package com.hzf.spark.study;
 2 
 3 import java.util.Map;
 4 import java.util.Set;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaPairRDD;
 8 import org.apache.spark.api.java.JavaRDD;
 9 import org.apache.spark.api.java.JavaSparkContext;
10 import org.apache.spark.api.java.function.Function;
11 import org.apache.spark.api.java.function.PairFunction;
12 import org.apache.spark.broadcast.Broadcast;
13 
14 import scala.Tuple2;
15 
16 public class UVAnalysis02 {
17     public static void main(String[] args) {
18         SparkConf conf = new SparkConf().setAppName("UV_ANA").setMaster("local")
19                 .set("spark.testing.memory", "2147480000");
20         @SuppressWarnings("resource")
21         JavaSparkContext sc = new JavaSparkContext(conf);
22         JavaRDD<String> logRDD = sc.textFile("userLog1");
23         String str = "View";
24         final Broadcast<String> broadcast = sc.broadcast(str);
25         uvAnalyzeOptz(logRDD, broadcast);
26     }
27     
28     private static void uvAnalyzeOptz(JavaRDD<String> logRDD, final Broadcast<String> broadcast) {
29         JavaRDD<String> filteredLogRDD = logRDD.filter(new Function<String, Boolean>() {
30           
31             private static final long serialVersionUID = 1L;
32 
33             @Override
34             public Boolean call(String v1) throws Exception {
35                 String actionParam = broadcast.value();
36                 String action = v1.split("	")[5];
37                 return actionParam.equals(action);
38             }
39         });
40         
41         JavaPairRDD<String, String> up2LogRDD = filteredLogRDD.mapToPair(new PairFunction<String, String, String>() {
42 
43             private static final long serialVersionUID = 1L;
44 
45             @Override
46             public Tuple2<String, String> call(String val) throws Exception {
47                 String[] splited = val.split("	");
48                 String userId = splited[2];
49                 String pageId = splited[3];
50                  
51                 return new Tuple2<String, String>(userId + "_" + pageId,null);
52             }
53         });
54         
55         JavaPairRDD<String, Iterable<String>> groupUp2LogRDD = up2LogRDD.groupByKey();
56         
57         Map<String, Object> countByKey = groupUp2LogRDD.mapToPair(new PairFunction<Tuple2<String,Iterable<String>>, String, String>() {
58 
59             private static final long serialVersionUID = 1L;
60 
61             @Override
62             public Tuple2<String, String> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
63                 String pu = tuple._1;
64                 String[] splited = pu.split("_");
65                 String pageId = splited[1];
66                 return new Tuple2<String, String>(pageId,null);
67             }
68         }).countByKey();
69         
70         Set<String> keySet = countByKey.keySet();
71         for (String key : keySet) {
72             System.out.println("PAGEID:"+key+"	UV_COUNT:"+countByKey.get(key));
73         }
74     }
75 }
View Code
result
 


 

 
原文地址:https://www.cnblogs.com/haozhengfei/p/1878742a6fb0471d68c5323c2a1567cc.html