spark操作实例

启动命令

./bin/spark-shell
./bin/spark-shell --master yarn-client    //在yarn上启动

操作实例1

val df=sql("select * from default.orders")

df.select("user_id").distinct.count()

//selectExpr里面可以用hive sql语句
df.selectExpr("max(cast(user_id as int))").show()

df.groupBy("order_dow").count().show()

cache和persist 将数据放入内存

val priors = spark.sql("select * from default.order_products_prior")
val df2 = df.join(priors,"order_id").cache
val df1 = df.groupBy("order_dow").count().cache()
df2.uppersist  //从内存中释放

操作实例2

import org.apache.spark.sql.SparkSession

object TestFunc {
  def main(args: Array[String]): Unit = {
//    实例化sparksession 在client端自动实例化sparksession
//    Spark session available as 'spark'.
    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[2]")
      .enableHiveSupport()
      .getOrCreate()

    val df = spark.sql("select * from badou.orders")
    val priors = spark.sql("select * from badou.order_products_prior")

    """
      |4.每个用户根据order_hour_of_day这列的值对order_dow进行排序
      |1  2 08
      |1  3 07
      |
      |1 [(2,08),(3,07)]
      |
      |=> 1 [(3,07),(2,08)] 一个用户最喜爱购买商品的top3
      |rdd: (user_id,(order_number,order_hour_of_day))   
    """.stripMargin

    import spark.implicits._
    val orderNumberSort = df.select("user_id","order_number","order_hour_of_day")
      .rdd.map(x=>(x(0).toString,(x(1).toString,x(2).toString)))    //DataFrame转RDD
      .groupByKey()
      .mapValues(_.toArray.sortWith(_._2<_._2).slice(0,2))
      .toDF("user_id","order_sort_by_hour")

//    udf
    import org.apache.spark.sql.functions._
    val plusUDF = udf((col1:String,col2:String)=>col1.toInt+col2.toInt)
    df.withColumn("plus",plusUDF(col("order_number"),col("order_dow"))).show()


  }
}

 word count

val file = sc.textFile("/data/The_Man_of_Property.txt")
//file.take(3)
//file.flatMap(line=>line.split(" ")).take(10)
//file.flatMap(line=>line.split(" ")).map((_,1)).take(10)
//file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).take(10)
file.flatMap(line=>line.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/data/output/wc")

结巴分词

import com.huaban.analysis.jieba.{JiebaSegmenter, SegToken}
import com.huaban.analysis.jieba.JiebaSegmenter.SegMode
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object JiebaKry {
  def main(args: Array[String]): Unit = {
    //    定义结巴分词类的序列化
    val conf = new SparkConf()
      .registerKryoClasses(Array(classOf[JiebaSegmenter]))
      .set("spark.rpc.message.maxSize","800")
    //    建立sparkSession,并传入定义好的Conf
    val spark = SparkSession
      .builder()
      .appName("Jieba UDF")
      .enableHiveSupport()
      .config(conf)
      .getOrCreate()

    // 定义结巴分词的方法,传入的是DataFrame,输出也是DataFrame多一列seg(分好词的一列)
    def jieba_seg(df:DataFrame,colname:String): DataFrame ={

      val segmenter = new JiebaSegmenter()
      val seg = spark.sparkContext.broadcast(segmenter)
      val jieba_udf = udf{(sentence:String)=>
        val segV = seg.value
        segV.process(sentence.toString,SegMode.INDEX)
          .toArray().map(_.asInstanceOf[SegToken].word)
          .filter(_.length>1).mkString("/")
      }
      df.withColumn("seg",jieba_udf(col(colname)))
    }

    val df =spark.sql("select sentence,label from badou.news_noseg limit 300")
    val df_seg = jieba_seg(df,"sentence")
    df_seg.show()
//    df_seg.write.mode("overwrite").saveAsTable("badou.news_jieba")
  }
}
View Code

简单数据处理(特征提取)

package offline

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
object SimpleFeature {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("test")
      .master("local[2]")
      .enableHiveSupport()
      .getOrCreate()
    val priors = spark.sql("select * from badou.order_products_prior")
    val orders = spark.sql("select * from badou.orders")
    /** product feature
      * 1. 销售量prod_cnt
      * 2. 商品被再次购买(reordered)量prod_sum_rod
      * 3. 统计reordered比率prod_rod_rate   avg=sum/count  [0,1]
     */
//    销售量prod_cnt
    val prodCnt = priors.groupBy("product_id").count().withColumnRenamed("count","prod_cnt")
//   prod_sum_rod
    val prodRodCnt = priors.selectExpr("product_id","cast(reordered as int)")
      .groupBy("product_id")
      .agg(sum("reordered").as("prod_sum_rod"),
        avg("reordered").as("prod_rod_rate"),
        count("product_id").as("prod_cnt")
      )

    /** user Features:
      * 1. 每个用户购买订单的平均间隔 days orders
      * 2. 每个用户的总订单数
      * 3. 每个用户购买的product商品去重后的集合数据  user_id , set{prod1,prod2....}
      * 4. 用户总商品数量以及去重后的商品数量
      * 5. 每个用户平均每个订单有多少商品
      */
//  异常值处理:将days_since_prior_order中的空值进行处理
    val ordersNew = orders.selectExpr("*",
  "if(days_since_prior_order='',0.0,days_since_prior_order) as dspo")
  .drop("days_since_prior_order")

//    1.每个用户购买订单的平均间隔 days orders
    val userGap = ordersNew.selectExpr("user_id","cast(dspo as int) as dspo")
      .groupBy("user_id").avg("dspo").withColumnRenamed("avg(dspo)","u_avg_day_gap")
//    2. 每个用户的总订单数
    val userOrdCnt = orders.groupBy("user_id").count()
//    3. 每个用户购买的product商品去重后的集合数据   用户 product
    val opDF = orders.join(priors,"order_id")
    val up = opDF.select("user_id","product_id")

    import spark.implicits._
//    up.rdd.map()从DataFrame转变成rdd的数据,
// rdd.toDF()从rdd变成DataFrame,这里返回时tuple2,所以在DF中是两列
    val userUniOrdRecs = up.rdd.map{x=>(x(0).toString,x(1).toString)}
      .groupByKey()
      .mapValues(_.toSet.mkString(","))
      .toDF("user_id","prod_records")
//    4. 用户总商品数量以及去重后的商品数量
    val userAllProd = up.groupBy("user_id").count()

    val userUniOrdCnt = up.rdd.map{x=>(x(0).toString,x(1).toString)}
      .groupByKey()
      .mapValues(_.toSet.size)
      .toDF("user_id","prod_dist_cnt")

//    当有groupByKey的处理逻辑两个类似的方法时,看能不能合并
//    合并“去重后的集合数据”和“去重后的商品数量”统计逻辑
//    第一种合并提取公因子
    val userRddGroup = up.rdd.map(x=>(x(0).toString,x(1).toString)).groupByKey().cache()
    userRddGroup.unpersist() // python del userRddGroup
//    val userUniOrdRecs = userRddGroup.mapValues(_.toSet.mkString(",")).toDF("user_id","prod_records")
//    val userUniOrdCnt = userRddGroup.mapValues(_.toSet.size).toDF("user_id","prod_dist_cnt")

    // 第二种同时计算两个
    val userProRcdSize = up.rdd.map{x=>(x(0).toString,x(1).toString)}.groupByKey()
      .mapValues{records=>
        val rs = records.toSet;
        (rs.size,rs.mkString(","))
      }.toDF("user_id","tuple")
      .selectExpr("user_id","tuple._1 as prod_dist_size","tuple._2 as prod_records")

    val usergroup = up.groupBy("user_id")
      .agg(size(collect_set("product_id")).as("prod_dist_size"),
        collect_set("product_id").as("prod_records"))
//    5. 每个用户平均每个订单有多少商品
//    1)先求每个订单多少商品
    val ordProdCnt = priors.groupBy("order_id").count()
//    2)求每个用户订单商品数量的平均值
    val userPerOrdProdCnt = orders.join(ordProdCnt,"order_id")
      .groupBy("user_id")
      .agg(avg("count").as("u_avg_ord_prods"))
  }

  def feat(priors:DataFrame,orders:DataFrame):DataFrame={
    priors
  }

}
View Code

 

 参考资料

八斗大数据

原文地址:https://www.cnblogs.com/xumaomao/p/12681326.html