sparkStraming存储数据到mysql

package sparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkContext, SparkConf} 
import spark.bean.orders
import java.util.Properties 
import java.sql.{DriverManager, PreparedStatement, Connection}  
import org.apache.spark.{SparkContext, SparkConf}  

object WebPagePopularityValueCalculator {

  def main(args: Array[String]) {

    val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_, 2)).toMap
    
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val popularityData = lines.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split("\|")
        val pageID = dataArr(0)
        val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
        (pageID, popValue)
      }
    }
    //sum the previous popularity value and current value
    val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
      iterator.flatMap(t => {
        val newValue: Double = t._2.sum
        val stateValue: Double = t._3.getOrElse(0);
        Some(newValue + stateValue)
      }.map(sumedValue => (t._1, sumedValue)))
    }
    val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
    val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
      new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
    //set the checkpoint interval to avoid too frequently data checkpoint which may
    //may significantly reduce operation throughput
    stateDstream.checkpoint(Duration(8 * 2 * 1000))
    //after calculation, we need to sort the result and only show the top 10 hot pages
    stateDstream.foreachRDD { rdd =>
      {
        val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
        val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
        topKData.foreach{ case (k, v) => 
          if(v != 0) {
             println("page" + k + "  " + "value" + v)
             val itb = Iterator((k, v))
             toMySql(itb)
          }
          
        }
        
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
  
  def toMySql(iterator: Iterator[(String, Double)]): Unit = {
        var conn: Connection = null  
        var ps: PreparedStatement = null  
        val sql = "insert into userbehavior(page, number) values (?, ?)"  
        try {  
            Class.forName("com.mysql.jdbc.Driver");  
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")  
            iterator.foreach(dataIn => {  
                ps = conn.prepareStatement(sql)  
                ps.setString(1, dataIn._1)  
                ps.setDouble(2, dataIn._2)  
                ps.executeUpdate()  
            }  
            )  
        } catch {  
            case e: Exception => e.printStackTrace()  
        } finally {  
            if (ps != null) {  
                ps.close()  
        }  
            if (conn != null) {  
                conn.close()  
            }  
        }  
  }
  
}



  

 重复存储问题修复

package sparkStreaming

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.sql.{ DriverManager, PreparedStatement, Connection }
import org.apache.spark.{ SparkContext, SparkConf }

object WebPagePopularityValueCalculator1 {
  def main(args: Array[String]) {
    val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_, 2)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val popularityData = lines.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split("\|")
        val pageID = dataArr(0)
        val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
        val itb = Iterator((pageID, popValue))
        toMySql(itb)
        (pageID, popValue)
      }
    }
    popularityData.print()
    ssc.start()
    ssc.awaitTermination()

  }

  def toMySql(iterator: Iterator[(String, Double)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into userbehavior(page, number) values (?, ?)"
    try {
      Class.forName("com.mysql.jdbc.Driver");
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")
      iterator.foreach(dataIn => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, dataIn._1)
        ps.setDouble(2, dataIn._2)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

}
package sparkStreaming

import java.util.Properties
import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import java.sql.{ DriverManager, PreparedStatement, Connection }
import org.apache.spark.{ SparkContext, SparkConf }

object WebPagePopularityValueCalculator1 {
  def main(args: Array[String]) {
    val Array(zkQuorum, group, topics) = Array("localhost:2181", "1", "sun_test_topic")
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicpMap = topics.split(",").map((_, 2)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
    val popularityData = lines.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split(",")
        val pageID = dataArr(0)
        val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
        val itb = Iterator((pageID, dataArr(1).toDouble, dataArr(2).toDouble, dataArr(3).toDouble))
        toMySql(itb)
        (pageID, popValue)
      }
    }
    popularityData.print()
    ssc.start()
    ssc.awaitTermination()

  }

  def toMySql(iterator: Iterator[(String, Double, Double, Double)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "insert into userbehaviordatasource(page, v1, v2, v3) values (?, ?, ?, ?)"
    try {
      Class.forName("com.mysql.jdbc.Driver");
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "Sun@123")
      iterator.foreach(dataIn => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, dataIn._1)
        ps.setDouble(2, dataIn._2)
        ps.setDouble(3, dataIn._3)
        ps.setDouble(4, dataIn._4)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (ps != null) {
        ps.close()
      }
      if (conn != null) {
        conn.close()
      }
    }
  }

}
原文地址:https://www.cnblogs.com/sunyaxue/p/6544033.html