titlesplit源码

CREATE TABLE titlesplit(id INT(12) UNSIGNED NOT NULL  AUTO_INCREMENT,
innserSessionid VARCHAR(50),
times VARCHAR(50),
channelType VARCHAR(50),
sourcetitle VARCHAR(500),
title VARCHAR(500),
words VARCHAR(500),
characters VARCHAR(150),
refer VARCHAR(150),
role VARCHAR(150),
Nowtime INT(15),
PRIMARY KEY(id)
)  DEFAULT CHARSET=utf8;

ALTER TABLE `titlesplit` ADD INDEX(`words`)
/**
  * Created by lkl on 2017/6/26.
  *///spark-shell --driver-class-path /home/hadoop/test/mysqljdbc.jar
import java.sql.{DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.text.SimpleDateFormat
import java.util.Date
object titlesplit {

  val rl = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"

  classOf[com.mysql.jdbc.Driver]
  val conn = DriverManager.getConnection(rl)
  val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("test")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val format = new java.text.SimpleDateFormat("yyyyMMdd")
    val yearformat = new java.text.SimpleDateFormat("yyyy")
    val year = yearformat.format(new java.util.Date().getTime())
//
    val format2s=new java.text.SimpleDateFormat("yyyyMMddHHmmss")

//
    val monthformat = new java.text.SimpleDateFormat("MM")
    val month = monthformat.format(new java.util.Date().getTime())

    val dayformat = new java.text.SimpleDateFormat("dd")
    val day = dayformat.format(new java.util.Date().getTime())

    val dat01 = format.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
    val dat02 = format.format(new java.util.Date().getTime() - 0 * 24 * 60 * 60 * 1000)
    val dat03 = format.format(new java.util.Date().getTime() - 2 * 24 * 60 * 60 * 1000)

    val format2 = new java.text.SimpleDateFormat("yyyy-MM-dd")
    val dat = format2.format(new java.util.Date().getTime() - 1 * 24 * 60 * 60 * 1000)
    // val log01= sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/"+year+"/"+month+"/"+day+"/events_192.168.0.217_datacenter4.1499879147814")
   // val log01 = sc.textFile("hdfs://192.168.0.211:9000/user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/14/events_192.168.0.217_datacenter4.1499994258650.gzip")
    ///user/datacenter/home/datacenter/datacollect/logs/dataplatform/Crawler/Crawler_Common_WebPageNews/2017/07/13
    //    val  l=log01.map(line=>(line.split("","")(1).split("":"")(1),line.split("","")(4).split("":"")(1),line.split("","")(12).split("":"")(1)
    //     ,line.split("","")(13).split("":"")(1)
    //      ,line.split("","")(23).split("":"")(1)))
    //
    //     val role = "jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
    //    import sqlContext.implicits._
    //    val df=l.toDF("channelType","sourcetitle","title","time","innerSessionId")
    //    df.printSchema()
    //    df.insertIntoJDBC(role, "newstitles", true)

    val job = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "s_data_Crawler_Common_WebPageNews")
    val jo = job.toDF().registerTempTable("job")
    val ed = sqlContext.sql("select `INNERSESSIONID`,`TIME`,`CHANNELTYPE`,`SOURCETITLE`,`TITLE` from job")
    // val job = sqlContext.jdbc("jdbc:mysql://192.168.0.37:3306/emotional?user=root&password=123456", "layer")
    //val jo = job.toDF().registerTempTable("job")
//  val  d=sqlContext.sql("select words from job")

    val pp = ed.map(p => {
      val v0 = p.getString(0)
      val v1 = p.getTimestamp(1).toString
      val v2 = p.getString(2)
      val v3 = p.getString(3)
      val v4 = p.getString(4)
      val v5 = p.getString(4).split("\|")
      (v0, v1, v2, v3, v4, v5)
    })

    pp.foreach(p => {
      for (i <- 0 until p._6.size) {
        println(p._6.size)
        val v0 = p._1
        val v1 = p._2
        val v2 = p._3
        val v3 = p._4
        val v4 = p._5
        val v5 = p._6(i).split(" ")
        if (v5.size == 4) {
          val now = new Date()
          val a = now.getTime.toInt
          insert(v0, v1, v2, v3, v4, v5(0), v5(1), v5(2), v5(3),a)
        }

      }

    })
    conn.close()

  }
    def insert(value0: String, value1: String, value2: String, value3: String, value4: String, value5: String,
               value6: String, value7: String, value8: String,value9:Int): Unit = {


      // CREATE TABLE words2(innersessionId VARCHAR(100),words VARCHAR(100), VARCHAR(100),posit VARCHAR(100),va VARCHAR(100))
      try {
        val prep = conn.prepareStatement("INSERT INTO titlesplit(innserSessionid,times,channelType,sourcetitle,title,words,characters,refer,role,Nowtime) VALUES (?,?,?,?,?,?,?,?,?,?) ")
        prep.setString(1, value0)
        prep.setString(2, value1)
        prep.setString(3, value2)
        prep.setString(4, value3)
        prep.setString(5, value4)
        prep.setString(6, value5)
        prep.setString(7, value6)
        prep.setString(8, value7)
        prep.setString(9, value8)
        prep.setInt(10,value9)
        prep.executeUpdate
      } catch {
        case e: Exception => e.printStackTrace
      }
      finally {

      }
    }
  }
原文地址:https://www.cnblogs.com/canyangfeixue/p/7269301.html