SparkStreaming消费Kafka数据并计算后往Hbase和Mysql写数据案列

package com.lg.blgdata.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.kafka.common.serialization.StringDeserializer
import kafka.serializer.StringDecoder
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.PerPartitionConfig
import org.apache.spark.streaming.kafka010.PreferConsistent
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategy
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import java.text.SimpleDateFormat
import java.util.Calendar
import java.util.Date
import scala.collection.mutable
import java.lang.Long
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable.ListBuffer
import java.lang.Double
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import com.lg.bigdata.utils.JZWUtil
import com.lg.blgdata.utils.DruidConnectionPool
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.CallableStatement
import java.util.regex.Pattern

/**
 * 1. 创建Driver 无状态
 * 	(1)kafka给Hbase合成断面级1分钟数据
 *        (2)同时计算各个断面的在网车信息更新进入mysql:car_num_online→(carId,now_car_num)
 */
object KafkaHbaseMinute1{
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH")
  val daysdf = new SimpleDateFormat("yyyy-MM-dd")
  val fmtScornd = new SimpleDateFormat("ss")
  
  val tableName="jzw_data:section_1min"
  val conf = HBaseConfiguration.create()

  def main(args: Array[String]): Unit = {
             //val groupId="lane_test"//测试用
            	val groupId="lane_data"//用于合成断面级数据
							//1.创建SparkConf并初始化SSC
							val sparkConf=new SparkConf().setMaster("local[*]").setAppName("KafkaHbaseMinute1")
							val ssc=	new StreamingContext(sparkConf,Seconds(60))
						  ssc.sparkContext.setLogLevel("WARN")

							 /*2.定义kafka参数将kafka参数映射为map
							  * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
                * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
								* none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
								*/

							val kafkaParams = Map[String, Object](
									"bootstrap.servers" -> "hadoop104:9092",           //kafka链接地址
									"key.deserializer" -> classOf[StringDeserializer], //序列化
									"value.deserializer" -> classOf[StringDeserializer],//反序列化
									"group.id" -> groupId,                             //主题
									"auto.offset.reset" -> "earliest",               //earliest latest
									"enable.auto.commit" -> (true:java.lang.Boolean)   //是否让消费者自己提交偏移量
									)

							val topics=Array("car")

							//3.通过KafkaUtil创建kafkaDSteam
							//官方推荐的直连方式,使用kafka底层的API,效率更高
							val kafkaDSteam= KafkaUtils.createDirectStream(
									ssc,
									LocationStrategies.PreferConsistent,
									ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
							)

							//数据类型
							val schema = StructType(List(
									StructField("cameraId", StringType),
									StructField("time", StringType),
									StructField("lane_position", StringType),
									StructField("carType", StringType),
									StructField("speed", StringType),
									StructField("space", StringType))
									)
							 val jobConf = new JobConf(conf)
              	jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
              	jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
              	
               var  spark:SparkSession=null
							
              /**
                                       * 将reduceB
                                       * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
               */ 
							kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
								if(!rdd.isEmpty()){
  								    val map=getTime
  								    if(spark==null){
  								      spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
  								    }
  								    val df:DataFrame = spark.createDataFrame(rdd, schema)
											//进行时间筛选
											val dfV=df.filter("time >="+getTime.get("startDate").get+" and time <"+getTime.get("thisDate").get).toDF()
										  val array=dfV.select("time","cameraId","speed","space")
										  val idCount=array.groupBy("cameraId").count().collect()
										  
										  //把没路摄像头的数据组装起来
										  var rsList = new ListBuffer[mutable.Map[String, String]]()
  										idCount.foreach(x⇒{
  										  var map: mutable.Map[String, String] = mutable.Map()
  										    map("key")=getTime.get("thisDate").get+""+x.get(0).toString().replace("V", "")
  										    map("time")=format.format(getTime.get("thisDate").get)   //时间
  										    map("cameraId")=x.get(0).toString()                      //摄像头编号
  										    map("car_num")=x.get(1).toString()                       //车流量
  										   
  										   var mapS: mutable.Map[String, Double] = mutable.Map() 
  										   mapS("speedSum")=0.0    //流量和
  										   mapS("spacing")=0.0    //车间距和
  										   val vb = ssc.sparkContext.broadcast(mapS)
  										   
  										   array.foreach(data⇒{
  										    if( data.get(1).equals(x.get(0))){
  										        vb.value("speedSum")+=Double.valueOf(data.get(2).toString())
  										        vb.value("spacing")+=Double.valueOf(data.get(3).toString())
  										    }
  										   })
  										
  										 if(vb.value.size>0){
  										   
  										   val carMaps = JZWUtil.contentMatch(x.get(0).toString())
  										   
    											//车间距
    											map("now_avg_space")=(vb.value.get("spacing").get / Double.valueOf(map("car_num"))).formatted("%.2f").toString()    
  												   
  										    //速度
  										    map("now_avg_speed")=(vb.value.get("speedSum").get / Double.valueOf(map("car_num"))).formatted("%.2f").toString()    
  										   
  										    //断面车道密度
  										    map("now_avg_densit")=((1/(vb.value.get("spacing").get / Double.valueOf(map("car_num"))/1000))/3).formatted("%.2f").toString()     
  										  
  										    //拥堵度
  										    map("now_avg_TPI")=Math.abs(Double.valueOf(((1 - (vb.value.get("speedSum").get / Double.valueOf(map("car_num")) / 80)) * 10).formatted("%.2f"))).toString()  
  										    
  										    //通行时间
  										    val spacing = Double.parseDouble(carMaps.get("dZLength").get.toString()) //dZLength:检测单元距离
  										    var now_avg_passTime=scala.math.round(spacing / (vb.value.get("speedSum").get / Double.valueOf(map("car_num"))) * 60).toString() 
  										    //如果计算出的通行时间小于等于0,则(检测单元距离/80)*60
  										    if(Double.valueOf(now_avg_passTime)<=0){
  										      now_avg_passTime=((Double.valueOf(spacing)/80)*60).formatted("%.2f")
  										    }
  										    map("now_avg_passTime")=now_avg_passTime
  										   
  										   //求排队长度
  										   //当前检测断面的检测断面的车速小于20km/h,且车间距小于15m
  										 
  										    if (Double.parseDouble(map.get("now_avg_speed").get) < 20 && 
  										        vb.value.get("spacing").get/Double.valueOf(x.get(1).toString()) < 15) {
        										      val queueLength = carMaps.get("cDLength").get
                                  map("nwo_len_que") = queueLength  
  										    }else{
  										            map("nwo_len_que") = "0"  
  										    }
  										    rsList.+=(map)
  										 }
  										})
  									//转RDD
  									val rdds = ssc.sparkContext.parallelize(rsList)
  								
  									//RDD组装PUT对象
  									val dataRDD=rdds.map(x⇒{
                        convert(x)
                     })
                    //保存进入Hbase
                   dataRDD.saveAsHadoopDataset(jobConf)
               //==================================一下部分为计算在网车信息==========================
                  //获取一个数据库连接(适用数据库连接池)
                  var connection:Connection=null    
                  var statement:PreparedStatement=null
                  var resultSet:ResultSet=null
                  var pstm:CallableStatement=null
                  
                try{ 
                      if(connection==null){
                        	connection=DruidConnectionPool.getConnection
                      }
                     	//开启事物,不自动提交
											connection.setAutoCommit(false)

										  val querySql="SELECT DISTINCT cam_ID,len_detect_zone FROM detect_zone_ZH_len"
                     	statement=connection.prepareStatement(querySql)
							        resultSet=statement.executeQuery()
							        var rs =new ListBuffer[mutable.Map[String, String]]()
							        //遍历结果集
      								while(resultSet.next()){
      								  val cam_ID=resultSet.getString(1)        //当前摄像头
      								  val len_detect_zone=resultSet.getInt(2)  //检测单元长度

      								  var sqMap: mutable.Map[String, String] = mutable.Map()
      								      sqMap("cam_ID")=cam_ID
      								       //在网车辆数=检测单元密度*检测单元长度*3/1000
      								       rsList.foreach(x⇒{
      								         if(x.get("cameraId").getOrElse("0").equals(cam_ID)){
      								              sqMap("now_car_num")=Math.abs(scala.math.round(Double.valueOf(Double.valueOf(x.get("now_avg_densit").getOrElse("0"))*len_detect_zone*3/1000.0))).toString()
      								              }
      								       })
      								       if(sqMap.get("now_car_num").isEmpty){//当前摄像头没有在网车则默认为0
      								           sqMap("now_car_num")="0"

      								       }
        								     rs=rs.+=(sqMap)
      						    	}
										 //把在网车信息更新进入mysql
                     val upsql="update jzwdata.car_num_online set now_car_num=? where cam_ID=?"
								     pstm=connection.prepareCall(upsql)
								     	//设置参数
													for(t <- rs){
														pstm.setString(1, t.get("now_car_num").get)    //在网车辆数
														pstm.setString(2, t.get("cam_ID").get)         //摄像头ID
														//pstm1.executeUpdate()//(数据量少)提交
														pstm.addBatch()//先把结果攒起来
													}
											pstm.executeBatch()//循环完毕提交
												//提交事务
											connection.commit()
									  } catch {
            					case e: Exception => {
            					  	//回滚事务
											    connection.rollback()
											    //优雅停止SparkStreamin
											     ssc.stop(true)
            						throw new RuntimeException("查询历史在网车出现异常!");
            				  }
									} finally{
            						if(resultSet!=null){
            							resultSet.close()
            						}
            						if(statement!=null){
            							statement.close()
            						}
            						if(connection!=null){
            							connection.close()
            						}
					        }
								}
							})
							//启动采集器
							ssc.start()

							//Driver等待采集器的执行,采集器终止,Driver也会终止
							ssc.awaitTermination()
  }
  
  //取当时间节点整分钟
  def getTime():mutable.Map[String, Long]= {
    val date: Calendar = Calendar.getInstance()
    val indexMinute = sdf.format(date.getTime())+":00" 
    val times:Long=format.parse(indexMinute).getTime()
     
    val sdate: Calendar = Calendar.getInstance()
    sdate.setTime(format.parse(indexMinute))
    sdate.add(Calendar.MINUTE, -1)
    
    
    var map: mutable.Map[String, Long] = mutable.Map()
    map("startDate")=sdate.getTimeInMillis
    map("thisDate")=times
    (map)
  }
  
   //定义往Hbase插入数据的方法
 def convert(map: mutable.Map[String, String])= {
    //1.获取表对像
    val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey

    //车间距
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"), Bytes.toBytes(map.get("now_avg_space").get))
   
    //车流量
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get))

    //时间
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get))

   
    //平均速度(车道速度和/车道车辆数)(平均)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get))

    //当前车道密度(1km/平均车间距)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get))
  
    //拥堵度((1-(平均速度/最高限速))*10)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get))

    //摄像头编号
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get))

    //排队长度
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("nwo_len_que"), Bytes.toBytes(map.get("nwo_len_que").get))
    
    //通行时间
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"), Bytes.toBytes(map.get("now_avg_passTime").get))

    (new ImmutableBytesWritable, put)
  }
}

  

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/KdeS/p/14307065.html