spark 解析非结构化数据存储至hive的scala代码

//提交代码包
// /usr/local/spark/bin$  spark-submit --class "getkv" /data/chun/sparktes.jar

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
object split {
   def  main(args:Array[String])
  {


  val cf = new SparkConf().setAppName("ass").setMaster("local")
  val sc = new SparkContext(cf)
  val sqlContext = new SQLContext(sc)
  val hc = new HiveContext(sc)
  val  format=new java.text.SimpleDateFormat("yyyy-MM-dd")
  val date=format.format(new java.util.Date().getTime-9*24*60*60*1000)

  val lg= sc.textFile("hdfs://master:9000/data/"+date+"/*/*.gz")


  val filed1=lg.map(l=>(l.split("android_id":"").last.split(""").head.toString,
    l.split("anylst_ver":").last.split(",").head.toString,
    l.split("area":"").last.split(""").head,
    l.split("build_CPU_ABI":"").last.split(""").head,
    l.split("build_board":"").last.split(""").head,
    l.split("build_model":"").last.split(""").head,
    l.split(""city":"").last.split(""").head,
    l.split("country":"").last.split(""").head,
    l.split("cpuCount":").last.split(",").head,
    l.split("cpuName":"").last.split(""").head,
    l.split("custom_uuid":"").last.split(""").head,
    l.split("cid":"").last.split(""").head,
    l.split("definition":"").last.split(""").head,
    l.split("firstTitle":"").last.split(""").head,
    l.split("modeType":"").last.split(""").head,
    l.split("pageName":"").last.split(""").head,
    l.split("playIndex":"").last.split(""").head,
    l.split("rectime":").last.split(",").head,
    l.split("time":"").last.split(""").head))
   //val F1=filed1.toDF("custom_uuid","region","screenHeight","screenWidth","serial_number","touchMode","umengChannel","vercode","vername","wlan0_mac","rectime","time")
  val scoreDataFrame1 = hc.createDataFrame(filed1).toDF("android_id","anylst_ver","area","build_CPU_ABI","build_board","build_model","city","country","cpuCount","cpuName","custom_uuid","cid","definition","firstTitle","modeType","pageName","playIndex","rectime","time")
  scoreDataFrame1.write.mode(SaveMode.Append).saveAsTable("test.f1")




  val filed2=lg.map(l=>(l.split("custom_uuid":"").last.split(""").head,
    l.split("playType":"").last.split(""").head,
    l.split("prevName":"").last.split(""").head,
    l.split("prevue":").last.split(",").head,
    l.split("siteName":"").last.split(""").head,
    l.split("title":"").last.split(""").head,
    l.split("uuid":"").last.split(""").head,
    l.split("vod_seek":"").last.split(""").head,
    l.split("device_id":"").last.split(""").head,
    l.split("device_name":"").last.split(""").head,
    l.split("dpi":").last.split(",").head,
    l.split("eth0_mac":"").last.split(""").head,
    l.split("ip":"").last.split(""").head,
    l.split("ipaddr":"").last.split(""").head,
    l.split("isp":"").last.split(""").head,
    l.split("largeMem":").last.split(",").head,
    l.split("limitMem":").last.split(",").head,
    l.split("packageName":"").last.split(""").head,
    l.split("rectime":").last.split(",").head,
    l.split("time":"").last.split(""").head))
  import sqlContext.implicits._
  val scoreDataFrame2 = hc.createDataFrame(filed2).toDF("custom_uuid","playType","prevName","prevue","siteName","title","uuid","vod_seek","device_id","device_name","dpi","eth0_mac","ip","ipaddr","isp","largeMem","limitMem","packageName","rectime","time")
  scoreDataFrame2.write.mode(SaveMode.Append).saveAsTable("test.f2")
//

  val filed3=lg.map(l=>(l.split("custom_uuid":"").last.split(""").head,
    l.split("region":"").last.split(""").head,
    l.split("screenHeight":").last.split(",").head,
    l.split("screenWidth":").last.split(",").head,
    l.split("serial_number":"").last.split(""").head,
    l.split("touchMode":").last.split(",").head,
    l.split("umengChannel":"").last.split(""").head,
    l.split("vercode":").last.split(",").head,
    l.split("vername":"").last.split(""").head,
    l.split("wlan0_mac":"").last.split(""").head,
    l.split("rectime":").last.split(",").head,
    l.split("time":"").last.split(""").head
  ))


  import sqlContext.implicits._
  val scoreDataFrame3= hc.createDataFrame(filed3).toDF("custom_uuid","region","screenHeight","screenWidth","serial_number","touchMode","umengChannel","vercode","vername","wlan0_mac","rectime","time")
  scoreDataFrame3.write.mode(SaveMode.Append).saveAsTable("test.f3")

  }
}
原文地址:https://www.cnblogs.com/canyangfeixue/p/9066647.html