layer

/**
* Created by lkl on 2017/6/27.
*/
import java.sql.{DriverManager, ResultSet}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.math.BigDecimal
object layer {

val rl= "jdbc:mysql://10.19.65.17:54321/emotion?user=emotion&password=qingxu&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
classOf[com.mysql.jdbc.Driver]
val conn = DriverManager.getConnection(rl)
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE)
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val role = "jdbc:mysql://10.19.65.17:54321/todba?user=emotion&password=qingxu&useUnicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"
/// user/songchunlin
val log = sc.textFile("hdfs://ns1/user/songchunlin/汉语情感词极值表.txt")
val g = log.map(line => (line.split(" ").head, line.split(" ").last.trim))
import sqlContext.implicits._
val df = g.toDF("words", "value").registerTempTable("layer")
val value = sqlContext.sql("select words,value from layer")
val pp = value.map(p => {
val words = p.getString(0)
val value = p.getString(1)
(words, value)
})

pp.foreach(p => {
val v0 = p._1
val v1 = p._2.toFloat
insert(v0, v1)

})
conn.close()
}


def insert(value0: String, value1: Float): Unit = {

println(value0, value1)

try {
val prep = conn.prepareStatement("INSERT INTO layer(words,VALUE) VALUES (?,?) ")
prep.setString(1, value0)
prep.setFloat(2, value1)

prep.executeUpdate
} catch {
case e: Exception => e.printStackTrace
}
finally {
}
}

}

原文地址:https://www.cnblogs.com/canyangfeixue/p/8566826.html