医养调度项目

1.集群启动

   集群共三个节点,min01,min02,min03

    网络启动(桥接) : 关闭桥接2,只留桥接三

    在 min01 节点 ,sh  start.sh 启动 hdfs,yarn ,spark

           sh zk.sh 启动zk集群(三台同时操作)

         sh kafka.sh 启动kafka集群,三台同时操作,

简单的kafka脚本
/root/apps/kafka_2.11-0.11.0.2/bin/kafka-console-producer.sh  --broker-list min01:9092,min02:9092,min03:9092 --topic spark_streaming_test
生产消息

/root/apps/kafka_2.11-0.11.0.2/bin/kafka-console-consumer.sh --zookeeper min01:2181,min02:2181,min03:2181 --topic spark_streaming_test --from-beginning

接受消息




/root/apps/kafka_2.11-0.11.0.2/bin/kafka-topics.sh --zookeeper min01:2181,min02:2181,min03:2181 --topic sst --create --replication-factor 2 --partitions 3

创建主题
工具类-zk

/**
 * @Author: 唐
 * @Date: 2020/3/25 20:19
 */
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkMarshallingError
import org.I0Itec.zkclient.serialize.ZkSerializer
object ZKUtil {
  def initZKClient(zkServers : String,sessionTimeout : Int,connectionTimeout: Int): ZkClient ={
    new ZkClient(zkServers,sessionTimeout,connectionTimeout,new ZkSerializer {
      override def serialize(data: scala.Any): Array[Byte] = {
        try {
          data.toString.getBytes("UTF-8")
        } catch  {
          case _: ZkMarshallingError => null
        }
      }

      override def deserialize(bytes: Array[Byte]): AnyRef = {
        try{
          new String(bytes,"UTF-8")
        }catch {
          case _: ZkMarshallingError => null
        }
      }
    })
  }

}
代码工具类-数据连接池
/**
 * @Author: 唐
 * @Date: 2020/3/25 23:28
 */
import java.sql.{Connection,DriverManager}
import java.util.concurrent.ConcurrentLinkedDeque
object ConnectionPool {
  private var queue: ConcurrentLinkedDeque[Connection] = _
  Class.forName("com.mysql.jdbc.Driver")
  def getConnection()={
    if (queue == null) queue=new ConcurrentLinkedDeque[Connection]()
    if (queue.isEmpty){
      for (i<- 1 to 10){
        val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/syllabus?useUnicode=true&characterEncoding=utf8", "root", "123456")
        conn.setAutoCommit(false)
        queue.offer(conn)
      }
    }
    queue.poll()
  }
  def returnConnection(conn:Connection)={
    queue.offer(conn)
  }
}

代码逻辑

/**
 * @Author: 唐松怀
 * @Date: 2020/3/25 22:32
 */
/**
 * @Author: 唐松怀
 * @Date: 2020/3/25 20:12
 */



import java.sql.{DriverManager, PreparedStatement}

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.util.{Success, Try}


object test01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter8_4_5")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Durations.seconds(10))

    val topics = Set("spark_streaming_test")
    val kafkaParams = mutable.Map[String, String]()
    kafkaParams.put("bootstrap.servers", "min01:9092,min02:9092,min03:9092")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("session.timeout.ms", "30000")
    kafkaParams.put("enable.auto.commit", "false")
    kafkaParams.put("max.poll.records", "100")
    kafkaParams.put("kafka.topics", "spark_streaming_test")
    kafkaParams.put("group.id", "g_spark_test")

    val zkHost = "min01:2181,min02:2181,min03:2181"
    val sessionTimeout = 120000
    val connectionTimeout = 60000
    val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout)

    val zkTopic = "spark_streaming_test"
    val zkConsumerGroupId = "g_spark_test"

    val zkTopicDir = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic)
    val zkTopicPath = zkTopicDir.consumerOffsetDir

    val childrenCount = zkClient.countChildren(zkTopicPath)
    var kafkaStream: InputDStream[(String, String)] = null
    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    kafkaStream = if (childrenCount > 0) {
      val req = new TopicMetadataRequest(topics.toList, 0)
      val leaderConsumer = new SimpleConsumer("min01", 9092, 10000, 10000, "StreamingOffsetObserver")

      val res = leaderConsumer.send(req)
      val topicMetaOption = res.topicsMetadata.headOption

      val partitions = topicMetaOption match {
        case Some(tm) =>
          tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
        case None =>
          Map[Int, String]()
      }

      for (partition <- 0 until childrenCount) {
        val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition)
        val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition)
        val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
        val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset")
        val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
        var nextOffset = partitionOffset.toLong
        if (curOffsets.nonEmpty && nextOffset < curOffsets.head) {
          nextOffset = curOffsets.head
        }
        fromOffsets += (tp -> nextOffset)
      }

      val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message)
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler)
    } else {
      KafkaUtils.createDirectStream[
        String,
        String,
        StringDecoder,
        StringDecoder](ssc, kafkaParams.toMap, topics)
    }

    var offsetRanges: Array[OffsetRange] = null

    val kafkaInputDStream = kafkaStream.transform { rdd => {
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }
    }

    val kafkaValues = kafkaInputDStream.map(_._2)

    val kafkaSplits = kafkaValues.map(_.split(",")).filter(_.length == 4)

    val results =kafkaSplits.map(_.mkString(","))
    results.foreachRDD(rdd => {
      //在Driver端执行
      rdd.foreachPartition(p => {
        //在Worker端执行
        //如果将输出结果保存到某个数据库,可在此处实例化数据库的连接器
        p.foreach(result => {

          val car = result.split(",")(0)
          val longitude = result.split(",")(1)
          val latitude = result.split(",")(2)
          val timestamp = result.split(",")(3)

          val conn=ConnectionPool.getConnection()

          val sql = "INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp ) values (?,?,?,? )"

          val statement: PreparedStatement = conn.prepareStatement(sql)

          statement.setString(1,car)
          statement.setString(2,longitude)
          statement.setString(3,latitude)
          statement.setString(4,timestamp)

          statement.addBatch()
          statement.executeBatch()

          conn.commit()
          ConnectionPool.returnConnection(conn)
          println(result)

        })
      })
      //ZkUtils不可序列化,所以需要在Driver端执行
      for (o <- offsetRanges) {
        ZkUtils.updatePersistentPath(zkClient, zkTopicDir.consumerOffsetDir + "/" + {
          o.partition
        }, o.fromOffset.toString)
        println("本次消息消费成功后,偏移量状态:" + o)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
View Code

 mysql     INSERT INTO syllabus.t_car_position (plate_num,longitude,latitude,timestamp) values (20,23.3,65.0,56);  相关信息

消费测试数据

京A12345,1144444444446.46,3555555555559.92,15550666666666666731094000
京A666,886.46,99.93,15550731094001
京A90,1991.46,696.93,15550731094003
A12345,116.46,39.93,15550731094003
N,45,89,100
A,1,2,3,4
B,3,4,5,6
C,5,6,7,8
D,7,8,9,4
RUSH B
原文地址:https://www.cnblogs.com/tangsonghuai/p/12577111.html