streaming+kafka 数据来源mysql数据库

package spark

import java.util.Properties

import java.util.HashMap
import org.apache.kafka.clients.producer._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{ SparkContext, SparkConf }
import spark.bean.orders

object SelectFromOneTable {
  def main(args: Array[String]) {
    val Array(brokers, topic, wordsPerMessage) = Array("localhost:9092", "sun_test", "1")
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("producer.type", "async")

    val producer = new KafkaProducer[String, String](props)

    val sparkConf = new SparkConf().setAppName("Spark SQL Test Case").setMaster("local")
    val sparkContext = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sparkContext)
    val url = "jdbc:mysql://localhost:3306/sun_test?user=root&password=Sun@123";
    val prop = new Properties();
    val df = sqlContext.read.jdbc(url, "flow", prop).collect()

    for (a <- df) {
      println(a)
      val message = new ProducerRecord[String, String](topic, null, a.toString())
      producer.send(message)
    }
  }
}  
原文地址:https://www.cnblogs.com/sunyaxue/p/6547168.html