014 在Spark中完成PV与UV的计算,重在源代码

1.代码

 1 object LogPVAndUV{
 2     def main(args:Array[String]):Unit={
 3         val conf=new SparkConf()
 4             .setMaster("local[*]")
 5             .setAppName("PVAndUV")
 6         val sc=SparkContext.getOrCreate(conf)
 7         val logPath="/user/beifeng/spark/logs/page_views.data"
 8         val logRDD=sc.textFile(logPath)
 9         val filterRDD=logRDD.filter(_.length>0)
10         //转换
11         val mapRDD=filterRDD.map(line=>{
12             val arr=line.split("	")
13             if(arr.length==7){
14                 val date=arr(0).trim
15                 val url=arr(1)
16                 val uuid=arr(2)
17                 (date.subString(0,Math.min(10.date.length)).trim,url,uuid)
18             }else{
19                 (null,null,null)
20             }
21         }).filter(tuple=>tuple._1!=null&&tuple._1.length>0)
22         //PV计算
23         val pvRDD=mapRDD
24             .filter(tuple=>tuple._2.length>0)
25             .map(tuple=>(tuple._1,1))
26             .reduceByKey(_+_)
27         //UV计算
28         val uvRDD=mapRDD
29             .filter(tuple=>tuple._3.length>0)
30             .map(tuple=>(tuple._1,tuple._3))
31             .distinct
32             .reduceByKey(_+_)
33         //合并
34         val pvAndUv=pvRDD.join(uvRDD).map{
35             case (date,(pv,uv))=>{
36                 (date,pv,uv)
37             }
38         }
39         //输出
40         pvAndUv.saveAsTextFile("/user/beifeng/spark/output/"+System.currentTimeMillis())
41         sc.stop()
42     }
43 }

2.PS

  rdd.foreachPartition(iter=>{

    //

  })

  对iter迭代器中的数据进行输出,iter表示的是一个分区的所有数据,这里的迭代器和groupbyKey中的实现方式不同,不会产生OOM

  主要用于将数据输出到非HDFS的存储系统中,不如MYSQL,Redis

原文地址:https://www.cnblogs.com/juncaoit/p/6390866.html