使用spark dataSet 和rdd 解决 某个用户在某个地点待了多长时间

 

现有如下数据文件需要处理
格式:CSV
位置:hdfs://myhdfs/input.csv
大小:100GB
字段:用户ID,位置ID,开始时间,停留时长(分钟)

 

4行样例:

 

UserA,LocationA,2018-01-01 08:00:00,60
UserA,LocationA,2018-01-01 09:00:00,60
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60

 

解读:

 

样例数据中的数据含义是:
用户UserA,在LocationA位置,从8点开始,停留了60分钟
用户UserA,在LocationA位置,从9点开始,停留了60分钟
用户UserA,在LocationB位置,从10点开始,停留了60分钟
用户UserA,在LocationA位置,从11点开始,停留了60分钟

 

该样例期待输出:
UserA,LocationA,2018-01-01 08:00:00,120
UserA,LocationB,2018-01-01 10:00:00,60
UserA,LocationA,2018-01-01 11:00:00,60

 

处理逻辑:
1 对同一个用户,在同一个位置,连续的多条记录进行合并
2 合并原则:开始时间取最早时间,停留时长加和

 

要求:请使用SparkMapReduce或其他分布式计算引擎处理

 

思路:按照按照用户ID和位置ID分组,分组之后按照时间列排序,由于数据之间的存在依赖关系,并且依赖关系比较连续,满足某种关系的数据要进行合并操作,因此使用sql部分的代码很难实现。在这使用的是将Dataset转化为RDD之后使用基于分区进行操作的方法处理数据。拿到相关的数据,按照时间顺序读取,判断,累加等进行处理。

 1 package com.zhf.streaming
 2 
 3 import java.text.SimpleDateFormat
 4 
 5 import org.apache.spark.Partitioner
 6 import org.apache.spark.rdd.RDD
 7 import org.apache.spark.sql.{Dataset, SparkSession}
 8 
 9 import scala.collection.mutable.ArrayBuffer
10 case class ResultData(userID:String,locationID:String,startTime:String,endTime:String,stayTime:Long)
11 object Test {
12   def main(args: Array[String]): Unit = {
13     val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
14     import spark.implicits._
15     import org.apache.spark.sql.functions._
16     val info = spark.read
17       .format("csv")
18       .option("path", "src/data/user.csv")
19       .load()
20       .toDF("userID", "locationID", "startTimes", "stayMinutes")
21       .as[(String, String, String, String)]
22 
23     val ds: Dataset[((String, String, String), ResultData)] = info.map {
24       case (userID, locationID, startTimes, stayMinutes) =>
25         //让起始时间+停留时间=结束时间
26         val sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
27         val date = sd.parse(startTimes)
28         val endTime = sd.format(date.getTime + (stayMinutes.trim.toInt * 60 * 1000))
29         ((userID, locationID, startTimes), ResultData(userID, locationID, startTimes, endTime, stayMinutes.trim.toLong))
30     }.as[((String, String, String), ResultData)]
31 
32     //按照用户ID和位置ID分组,分组之后按照时间列排序
33     val newDS: RDD[((String, String, String), ResultData)] = ds.rdd.repartitionAndSortWithinPartitions(new Partitioner {
34       override def numPartitions: Int = 4
35 
36       override def getPartition(key: Any): Int = key match {
37         case (userID, locationID, _) => (userID.hashCode + locationID.hashCode) % numPartitions
38         case _ => 0
39       }
40     })
41     val result = newDS.mapPartitions(iter => {
42       val listBuffer = iter.toBuffer
43       val buffer = ArrayBuffer.empty[ResultData]
44       var resultData: ResultData = null;
45       //分区内只有一个元素的情况
46       if (listBuffer.size == 1) {
47         resultData = listBuffer(0)._2;
48         buffer += resultData
49       } else {
50         //分区内有多个元素
51         listBuffer.foreach {
52           case ((userID, locationID, startTimes), currentData) =>
53             //初始化赋值
54             if (resultData == null) {
55               resultData = ResultData(userID, locationID, startTimes, currentData.endTime, currentData.stayTime)
56             } else {
57               //如果当前行的起始时间与上一行的结束时间相同
58               if (currentData.startTime == resultData.endTime) {
59                 //合并 修改初始值
60                 resultData = ResultData(currentData.userID, currentData.locationID, resultData.startTime, currentData.endTime, resultData.stayTime + currentData.stayTime)
61               } else {
62                 //不相同的情况下,将上一行结果添加到结果集,并修改初始值
63                 buffer += resultData
64                 resultData = currentData
65               }
66             }
67         }
68         //最后一个元素对象
69         if (resultData != null) {
70           buffer += resultData
71         }
72       }
73       buffer.toIterator
74     })
75     result.collect()
76       .sortBy(_.startTime)
77       .foreach(println)
78   }
79 }

 

原文地址:https://www.cnblogs.com/zhf123/p/11415118.html