学习进度(5)

视频:spark,1h

代码:1h 150行

小Demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author ywq
 * @date 2021/3/9 10:12
 */// TODO : 计算页面转换率
object PageflowAnalysis {

  def main(args: Array[String]): Unit = {
    var conf=new SparkConf().setMaster("local[*]").setAppName("HotCategoryTop10Analysis")
    var sc=new SparkContext(conf)
    //读取文件
    val action=sc.textFile("data/user_visit_action.txt")

    //统计页面对象
    val actionData=action.map(
      line=> {
       val datas=line.split("_")
        UserVisitAction(
          datas(0),
          datas(1).toLong,
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12).toLong,
                    )
      }
    )

    //缓存
    actionData.cache()
    //生成的满足条件的id集合:1-2 2-3....
    val ids=List[Long](1,2,3,4,5,6,7)
    val okflowids:List[(Long,Long)]=ids.zip(ids.tail)

    //计算每个页面出现的次数(wordcount):(page_id,sum)
    val pageToCountMap:Map[Long,Long]=actionData.filter(
      line=>
        {
          //合法页面跳转的首页面的范围
          ids.init.contains(line.page_id)
        }
    ).map(
      //转换数据格式
      line=>
        {
          //L:Long型整数
          (line.page_id,1L)
        }
    ).reduceByKey(_+_).collect().toMap

    //计算跳转的次数,数据格式为:(1,2),次数
    //根据session_id进行分组,获取每个用户一个session内的访问页面,返回(sessionid,集合)
    val sess:RDD[(String,Iterable[UserVisitAction])]=actionData.groupBy(_.session_id)

    //将不合法的页面跳转进行过滤
    //mapValues:迭代value
    val mvRDD: RDD[(String, List[((Long, Long), Int)])] = sess.mapValues(
      iter=>
        {
          //按访问时间升序排列
          val sortList:List[UserVisitAction]=iter.toList.sortBy(_.action_time)

          val flowIds:List[Long]=sortList.map(_.page_id)
          //出现的页面跳转
          val pageflowIds:List[(Long,Long)]=flowIds.zip(flowIds.tail)
          //将不合法的页面跳转进行过滤
          pageflowIds.filter(
            t=>
              {
                okflowids.contains(t)
              }
          ).map(
            t=>
              {
                (t,1)
              }
          )
        }
    )

    //统计各种类型页面跳转的数目
    val flatRDD: RDD[((Long, Long), Int)]=mvRDD.map(_._2).flatMap(list=>list)
    // (pageid1, pageid2), sum
    val dataRDD = flatRDD.reduceByKey(_+_)

    //计算跳转率:跳转的次数/第一个页面出现的次数
    dataRDD.foreach
    {
      case  ((page1,page2),sum)=>
        {
          val lon=pageToCountMap.getOrElse(page1,0L)
          println(s"页面${page1}跳转到页面${page2}单跳转换率为:" + ( sum.toDouble/lon ))
        }
    }
    sc.stop()
  }

  //用户访问动作表
  case class UserVisitAction
 (
  date:String,
  user_id:Long,
  session_id:String,
  page_id:Long,
  action_time:String,
  search_keyword: String,//用户搜索的关键词
  click_category_id: Long,//某一个商品品类的ID
  click_product_id: Long,//某一个商品的ID
  order_category_ids: String,//一次订单中所有品类的ID集合
  order_product_ids: String,//一次订单中所有商品的ID集合
  pay_category_ids: String,//一次支付中所有品类的ID集合
  pay_product_ids: String,//一次支付中所有商品的ID集合
  city_id: Long
 )
}

  结果截图:

 反思:熟练使用算子的用法和分析得出想要的结果需要的数据格式很重要

原文地址:https://www.cnblogs.com/ywqtro/p/14507684.html