SparkStreaming自定义采集器采集Hbase数据并计算后往Hbase写数据案列(2)

案列一:

package com.lg.bigdata.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.util.Bytes
import com.lg.bigdata.utils.JZWUtil
import java.util.Calendar
import java.text.SimpleDateFormat
import scala.collection.mutable
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.Connection
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import java.util.LinkedHashMap
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.DataFrame

/**
 * 	一.5分钟数合成 :车道级,断面级
 * 	从hbase拿取1分钟车道级数据合成5分钟断面级数据存入hbase
 * 	 要求:
 *        (1)格式保持与1分钟一致
 *
 *
 */
object HbaseSectionMinute5Data {
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val fmtminute = new SimpleDateFormat("mm")

  val conf = HBaseConfiguration.create()

  val tableNameInPut = "jzw_data:spot_jt_para_1min" //输出表
  val tableNameOutPut = "jzw_data:spot_jt_para_5min" //输入表→车道级
  val sectionTableNameOutPut = "jzw_data:spot_jt_section_5min" //输入表→断面级

  var connection: Connection = null
  var thism: String = null

  def main(args: Array[String]): Unit = {
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HbaseSectionMinute5Data")
    config.set("spark.streaming.receiverRestartDelay", "60000"); //设置Receiver启动频率,每5s启动一次(单位:毫秒)
    val sc = new SparkContext(config)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(60)) //设置Spark时间窗口,每5分钟处理一次300
    
    val rddStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    rddStream.print(1) //强行触发action操作//强行触发action操作

    //导入所需的类型
    import org.apache.spark.sql.types._
    //常见的聚合函数
    import org.apache.spark.sql.functions._
    //数据类型
    val schema = StructType(List(
      StructField("car_num", StringType),
      StructField("lane_position", StringType),
      StructField("now_avg_speed", StringType),
      StructField("now_avg_densit", StringType),
      StructField("now_avg_passTime", StringType),
      StructField("now_avg_TPI", StringType),
      StructField("now_avg_space", StringType),
      StructField("cameraId", StringType),
      StructField("time", StringType)))

   

     val jobConf = new JobConf(conf)
         jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
         jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableNameOutPut)

    val sectionjobConf = new JobConf(conf)
         sectionjobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
         sectionjobConf.set(TableOutputFormat.OUTPUT_TABLE, sectionTableNameOutPut)

    //属性提取
    var spark: SparkSession=null
    var dataFrame: DataFrame=null

    var line: Array[Row]=null
    var sectionline: Array[Row]=null

    var rsList:ListBuffer[mutable.Map[String, String]] = null
    var sectionList:ListBuffer[mutable.Map[String, String]] = null

    var map: mutable.Map[String, String] = null
    var smap: mutable.Map[String, String] = null

    rddStream.map(record =>JZWUtil.handlerMessage2Row(record)).foreachRDD(rdd => {
      if (rdd.count() > 0) {
        println("数据条数=======:" + rdd.count())
        val time=thism
        //时间获取
        val elong = format.parse(time).getTime()
        if(spark==null){
           spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
        }
        //rdd转DataFrame
        dataFrame= spark.createDataFrame(rdd, schema)

                    /*  一.5分钟车道级别
  						       * 	聚合分组   :摄像头 车道
  						       *  car_num :车流量和
  						       *  now_avg_speed:速度和
  						       *  now_avg_densit:密度和
  						       *  now_avg_passTime:通行时间和
  						       *  now_avg_TPI:排队长度和
  						       *  now_avg_space:车间距和
  						       **/
          line = dataFrame.groupBy("cameraId", "lane_position").agg(
          sum("car_num"),
          round((sum("now_avg_speed") / count("cameraId")), 2),
          round((sum("now_avg_densit") / count("cameraId")), 2),
          round((sum("now_avg_passTime") / count("cameraId")), 2),
          round((sum("now_avg_TPI") / count("cameraId")), 2),
          round((sum("now_avg_space") / count("cameraId")), 2)).collect()

          //二.5分钟断面级别
        sectionline= dataFrame.groupBy("cameraId").agg(
          sum("car_num"),
          round((sum("now_avg_speed") / count("cameraId")), 2),
          round((sum("now_avg_densit") / count("cameraId")), 2),
          round((sum("now_avg_passTime") / count("cameraId")), 2),
          round((sum("now_avg_TPI") / count("cameraId")), 2),
          round((sum("now_avg_space") / count("cameraId")), 2)).collect()

        //车道级别数据计算
        println("车道级别行数:" + line.size)
        rsList=new  ListBuffer[mutable.Map[String, String]]()
        var lane: Int = 0
        line.foreach(x ⇒ {
                              map= mutable.Map()
  						       	        /*  rowkey设计:
															 * 	列:00111607048520000
															 *   001 :前三位为摄像头编号去'V'
															 *   1  :第四位 (L:1  M:2  R:3)
															 *   1607048520000:第四位之后的部分为精确到分钟的4时间戳
															 **/

															 if(x.apply(1).equals("L")){
															   lane=1
															 }else if(x.apply(1).equals("M")){
															   lane=2
															 }else{
															   lane=3
															 }

															    map("key") = (x.apply(0).toString().replace("V", "") + lane + elong).toString() //hbase rowkey这里分钟数是5的倍数
																	map("car_num") = x.apply(2).toString() //车流量
																	map("time") = time //时间(2020-12-02 19:49:00)
																	map("lane_position") = x.apply(1).toString()//车道类别
																	map("now_avg_speed") = x.apply(3).toString()//速度
																	map("now_avg_densit") = x.apply(4).toString()//当前车道密度(1分钟求平均)
																	map("now_avg_passTime") = x.apply(5).toString()//通行时间(1分钟求平均)
																	map("now_avg_TPI") =x.apply(6).toString()//拥堵度(1分钟求平均)
																	map("now_avg_space") = x.apply(7).toString()//车间距(1分钟求平均)
																	map("cameraId") = x.apply(0).toString()
																	rsList.append(map)
        })
        println("rsList条数:" + rsList.size)
        var newrdd = sc.parallelize(rsList) //结果数据转rdd
        
       val dataRDD=newrdd.map(x⇒{
             
            JZWUtil.convert(x,5)
          })
         //一.保存5分钟的车道级数据.
       dataRDD.saveAsHadoopDataset(jobConf)

         //二.计算5分钟的断面级数据
         sectionList = new ListBuffer[mutable.Map[String, String]]()
         sectionline.foreach(x ⇒ {
                              smap= mutable.Map()
  						       	   /*       rowkey设计:
															 * 	列:00111607048520000
															 *   001 :前三位为摄像头编号去'V'
															 *   1  :第四位 (L:1  M:2  R:3)
															 *   1607048520000:第四位之后的部分为精确到分钟的时间戳
															 **/

															    smap("key") =elong+(x.apply(0).toString().replace("V", "")).toString() //hbase rowkey这里分钟数是5的倍数
																	smap("car_num") = x.apply(1).toString() //车流量
																	smap("time") = time //时间(2020-12-02 19:49:00)
																	smap("now_avg_speed") = x.apply(2).toString()//速度
																	smap("now_avg_densit") = x.apply(3).toString()//当前车道密度(1分钟求平均)
																	smap("now_avg_passTime") = x.apply(4).toString()//通行时间(1分钟求平均)
																	smap("now_avg_TPI") =x.apply(5).toString()//拥堵度(1分钟求平均)
																	smap("now_avg_space") = x.apply(6).toString()//车间距(1分钟求平均)
																	smap("cameraId") = x.apply(0).toString()
																	sectionList.append(smap)
        })
        println("sectionList条数:" + sectionList.size)
        //结果数据转rdd
        val Sectionrdd = sc.parallelize(sectionList)
       
        //rdd转为habse可存储格式
        val dataSectionRDD=Sectionrdd.map(x⇒{
            JZWUtil.convert(x,6)
          })

        dataSectionRDD.saveAsHadoopDataset(sectionjobConf)
        }
    })

    ssc.start()
    ssc.awaitTermination()
    println("connection关闭")
    connection.close()
  }

  /*
	 *   .由于没有读取HBase的Stream接口,需要一个自定义的Receiver用于查询HBase数据类
	 *   MyReceiver:自定义Receiver通过私有方法receive()方法读取HBase数据并调用store(b.toString())将数据写入DStream。
	 * 	申明采集器
	 *  1.继承Receiver 参数是存储级别
	 *  2.重写方法onStart,onStop
	 */
  class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    override def onStart(): Unit = {
      receive()
    }

    override def onStop(): Unit = {

    }

    private def receive(): Unit = {
      if (connection == null) {
        connection = ConnectionFactory.createConnection(conf)
      }
        
      //获取当前分钟数
      val edate: Calendar = Calendar.getInstance()
      val endMinute = sdf.format(edate.getTime()) + ":00"
      thism = endMinute
     println("时间:" + thism)
      val elong = format.parse(thism).getTime()
      val Minute = fmtminute.format(elong)
      if (Integer.valueOf(Minute) % 5 == 0) {
        println("IFSTART::>>>"+thism)
        val date: Calendar = Calendar.getInstance()

        val admin = connection.getAdmin;

        val table = new HTable(conf, tableNameInPut)
        val scan = new Scan()
        scan.setCacheBlocks(false)
        scan.addFamily(Bytes.toBytes("info"))

        val rowkey_1 = "." + elong //5
        val rowkey_2 = "." + getTime.get("rowkey_2").get //4
        val rowkey_3 = "." + getTime.get("rowkey_3").get //3
        val rowkey_4 = "." + getTime.get("rowkey_4").get //2
        val rowkey_5 = "." + getTime.get("rowkey_5").get //1

        /*
      						 *   摄像头前缀获取数据,
      						 * MUST_PASS_ONE: 取并集 相当于or 操作
      						 * MUST_PASS_ALL: 取交集 相当一and操作
      						 */
        val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        val rowFilter1: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_1))

        filterList.addFilter(rowFilter1)

        val rowFilter2: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_2))

        filterList.addFilter(rowFilter2)

        val rowFilter3: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_3))

        filterList.addFilter(rowFilter3)

        val rowFilter4: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_4))

        filterList.addFilter(rowFilter4)

        val rowFilter5: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_5))

        filterList.addFilter(rowFilter5)

        scan.setFilter(filterList)

        val rs = table.getScanner(scan)
        val iterator = rs.iterator()

        while (iterator.hasNext) {
          val result = iterator.next();
          val b = new StringBuilder
          b.append("{")
          b.append(""car_num":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num"))) + """) //流量
          b.append(",")
          b.append(""lane_position":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("lane_position"))) + """) //车道
          b.append(",")
          b.append(""now_avg_speed":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"))) + """) //速度
          b.append(",")
          b.append(""now_avg_densit":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))) + """) //密度
          b.append(",")
          b.append(""now_avg_passTime":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"))) + """) //通行时间
          b.append(",")
          b.append(""now_avg_TPI":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"))) + """) //拥堵度
          b.append(",")
          b.append(""now_avg_space":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))) + """) //车间距
          b.append(",")
          b.append(""cameraId":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId"))) + """) //摄像头
          b.append(",")
          b.append(""time":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time"))) + """) //摄像头
          b.append("}")
          store(b.toString())
        }
        table.close()
      }
      restart("Trying to connect again")
    }
  }

  def getTime(): mutable.Map[String, Long] = {
    var map: mutable.Map[String, Long] = mutable.Map()

    //0
    val date: Calendar = Calendar.getInstance()
    val Minute_1 = sdf.format(date.getTime()) + ":00"
    val dlong_1: Long = format.parse(Minute_1).getTime()
    map("rowkey_1") = dlong_1

    //1
    date.add(Calendar.MINUTE, -1) // 当前时间减1分钟
    val Minute_2 = sdf.format(date.getTime()) + ":00"
    val dlong_2: Long = format.parse(Minute_2).getTime()
    map("rowkey_2") = dlong_2

    //2
    date.add(Calendar.MINUTE, -1) // 当前时间减2分钟
    val Minute_3 = sdf.format(date.getTime()) + ":00"
    val dlong_3: Long = format.parse(Minute_3).getTime()
    map("rowkey_3") = dlong_3

    //3
    date.add(Calendar.MINUTE, -1) // 当前时间减3分钟
    val Minute_4 = sdf.format(date.getTime()) + ":00"
    val dlong_4: Long = format.parse(Minute_4).getTime()
    map("rowkey_4") = dlong_4

    //4
    date.add(Calendar.MINUTE, -1) // 当前时间减4分钟
    val Minute_5 = sdf.format(date.getTime()) + ":00"
    val dlong_5: Long = format.parse(Minute_5).getTime()
    map("rowkey_5") = dlong_5
    (map)
  }
}

  

案列二:

package com.lg.bigdata.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.util.Bytes
import com.lg.bigdata.utils.JZWUtil
import com.alibaba.fastjson.JSON
import java.util.Calendar
import java.text.SimpleDateFormat
import scala.collection.mutable
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.spark.SparkContext
import java.util.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.Connection
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import java.util.LinkedHashMap
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import java.lang.Long
import java.lang.Double
import org.apache.hadoop.hbase.client.Put

/**
 * 	一.5分钟预测数据合成 :断面级
 * 	从hbase拿取5分钟断面级数据预测5分钟断面数据存入hbase
 * 	 要求:
 *        (1)参数:时间/摄像头/车流量/速度/密度/拥堵度
 *
 *
 */
object PredSectionMinute5Data {
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val fmtminute = new SimpleDateFormat("mm")
  
  var sc: SparkContext=null
  val conf = HBaseConfiguration.create()
  
  val tableNameInPut = "jzw_data:spot_jt_section_5min" //输出表→断面级
  val predSectionTableNameOut= "jzw_data:spot_jt_section_5min_pred"//输入表→断面级预测
  
  var connection: Connection = null
  var thism:String=null
  var predTime:String=null

  def main(args: Array[String]): Unit = {
    
    //1.创建spark上下文
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("PredSectionMinute5Data")
    config.set("spark.streaming.receiverRestartDelay", "60000"); //设置Receiver启动频率,每5s启动一次(单位:毫秒)
    sc= new SparkContext(config)
    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Seconds(60)) //设置Spark时间窗口,每1分钟处理一次,时间是5分钟节点的时候计算

    //2.获取DStream类型的节点数据
    val rddStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    rddStream.print(1) //强行触发action操作
    
    //3.导入所需的类型
    import org.apache.spark.sql.types._
    //常见的聚合函数
    import org.apache.spark.sql.functions._
    //数据类型
    val schema = StructType(List(
      StructField("car_num", StringType),
      StructField("now_avg_speed", StringType),
      StructField("now_avg_densit", StringType),
      StructField("now_avg_passTime", StringType),
      StructField("now_avg_TPI", StringType),
      StructField("now_avg_space", StringType),
      StructField("cameraId", StringType),
      StructField("time", StringType)))
    
      //4.配置hbase的输出和输入表
     val jobConf = new JobConf(conf)
         jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
         jobConf.set(TableOutputFormat.OUTPUT_TABLE, predSectionTableNameOut)
     
      //5.数据转dataFrame后sql解析
      //属性提取    
      var df: DataFrame=null
      var sortDF: DataFrame=null
      var line: Array[Row]=null
         
      rddStream.map(record => JZWUtil.handlerMessage2Row(record)).foreachRDD(rdd => {
        
        if (rdd.count() > 0) {
          println("数据条数=======:" + rdd.count())
          //时间戳获取
          val predlong = format.parse(predTime).getTime()
          val  spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
              
          //rdd转DataFrame
          df= spark.createDataFrame(rdd, schema)
          
          //2.分组摄像头和对应条数
          line=df.groupBy("cameraId").count().collect()
          
          //把数据按照时间升序排序
          sortDF=df.sort("time")
          
          var resultData = new ListBuffer[mutable.Map[String, String]]() //结果数据统计
          line.foreach(x⇒{
                //1.把数据排升序   cameraId
                 val words=sortDF.filter("cameraId =='"+x.apply(0)+"'").collect()
                //3.套用公式计算
                //Ft=(5Xt-1+4Xt-2+3Xt-3+2Xt-4+Xt-5)/15
                 var carNum:Int=6        //权重
                 var carData:Double=0    //流量值(分子)
                 var speed:Double=0      //速度值(分子)
                 var densit:Double=0     //密度值(分子)
                 var rsNum=0             //权重之和(分母)
                 words.foreach(y⇒{
                        carNum=carNum-1
                        rsNum+=carNum
                        carData+=Double.valueOf(y.apply(0).toString())*(carNum)
                        speed+=Double.valueOf(y.apply(1).toString())*(carNum)
                        densit+=Double.valueOf(y.apply(2).toString())*(carNum)
                 })
                 //保存可用数据
                  var dataMap: mutable.Map[String, String] = mutable.Map()
                  
                  val car_num:Double=Math.abs(Double.valueOf(scala.math.round(carData/rsNum)))  //流量(取整)
                  val now_avg_speed=(speed/rsNum).formatted("%.2f")                             //(速度)四舍五入保留两位小数
                  val now_avg_densit=(densit/rsNum).formatted("%.2f")                           //(密度)四舍五入保留两位小数
                  
                  //时间/摄像头/车流量/速度/密度/拥堵度
                  dataMap("key") =predlong+(x.apply(0).toString().replace("V", "")).toString()     //hbase rowkey
                  dataMap("time")=format.format(getTime.get("rowkey_pred").get)
                  dataMap("car_num")= car_num.toString()      
                  dataMap("now_avg_speed")=now_avg_speed
                  dataMap("now_avg_densit")= now_avg_densit               
                  dataMap("now_avg_TPI") =Math.abs(((1 - (Double.valueOf(now_avg_speed) / 80)) * 10)).formatted("%.2f")   //拥堵度((1-(平均速度/最高限速))*10)
                  dataMap("cameraId")=x.apply(0).toString()        
                  
                  resultData.append(dataMap)
          })
          println("resultData数量:"+resultData.size)
          
         //结果数据转rdd
          val Sectionrdd = sc.parallelize(resultData) 
         
          //rdd转为habse可存储格式
          val dataSectionRDD=Sectionrdd.map(x⇒{
              convert(x)
            })
          
          dataSectionRDD.saveAsHadoopDataset(jobConf)
        }
      })

    ssc.start()
    ssc.awaitTermination()
    println("connection关闭")
    connection.close()
  }

 	  //定义往Hbase插入数据的方法
 def convert(map: mutable.Map[String, String])= {
    //1.获取表对像
    val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey

    //车流量
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get))

    //时间
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get))
   
    //平均速度(车道速度和/车道车辆数)(平均)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get))

    //当前车道密度(1km/平均车间距)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get))
  
    //拥堵度((1-(平均速度/最高限速))*10)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get))

    //摄像头编号
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get))

    (new ImmutableBytesWritable, put)
  } 
  
  /*
	 *   .由于没有读取HBase的Stream接口,需要一个自定义的Receiver用于查询HBase数据类
	 *   MyReceiver:自定义Receiver通过私有方法receive()方法读取HBase数据并调用store(b.toString())将数据写入DStream。
	 * 	申明采集器
	 *  1.继承Receiver 参数是存储级别
	 *  2.重写方法onStart,onStop
	 */
  class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    override def onStart(): Unit = {
      receive()
    }

    override def onStop(): Unit = {

    }

    private def receive(): Unit = {
    //1.获取连接  
     if (connection == null) {
            connection = ConnectionFactory.createConnection(conf)
      }
      
    //2.判断是否进行计算(5分钟节点开始预测)
    val elong = format.parse(thisMinute).getTime()
    thism=thisMinute
    //预测时间点
    predTime=format.format(getTime.get("rowkey_pred").get)
    val Minute = fmtminute.format(elong)
    println(Minute+"<++>时间:"+thism)
    
    if (Integer.valueOf(Minute) % 5 == 0) {
         //3.组建rowkey
        val rowkey_5 = getTime.get("rowkey_1").get+"."  //权重5
        val rowkey_4 = getTime.get("rowkey_2").get+"." //权重4
        val rowkey_3 = getTime.get("rowkey_3").get+"." //权重3
        val rowkey_2 = getTime.get("rowkey_4").get+"." //权重2
        val rowkey_1 = getTime.get("rowkey_5").get+"." //权重1
    
        //4.指定连接表
        val table = new HTable(conf, tableNameInPut)
        
        //5.组建scan多条件查询
        val scan = new Scan()
        scan.setCacheBlocks(false)//是否缓存
        scan.addFamily(Bytes.toBytes("info"))
    
        /*
    			 *   摄像头后缀获取数据,
    			* MUST_PASS_ONE: 取并集 相当于or 操作
    		  * MUST_PASS_ALL: 取交集 相当一and操作
    			*/
        val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        val rowFilter1: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_1))
        filterList.addFilter(rowFilter1)
    
        val rowFilter2: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_2))
        filterList.addFilter(rowFilter2)
    
        val rowFilter3: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_3))
        filterList.addFilter(rowFilter3)
    
        val rowFilter4: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_4))
        filterList.addFilter(rowFilter4)
    
        val rowFilter5: RowFilter = new RowFilter(
          CompareFilter.CompareOp.EQUAL,
          new RegexStringComparator(rowkey_5))
        filterList.addFilter(rowFilter5)
    
        scan.setFilter(filterList)
        
        //6.查询数据
       val rs = table.getScanner(scan)
       val iterator = rs.iterator()
    
          while (iterator.hasNext) {
              val result = iterator.next();
              val b = new StringBuilder
              b.append("{")
              b.append(""car_num":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num"))) + """) //流量
              b.append(",")
              b.append(""now_avg_speed":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"))) + """) //速度
              b.append(",")
              b.append(""now_avg_densit":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))) + """) //密度
              b.append(",")
              b.append(""now_avg_passTime":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime"))) + """) //通行时间
              b.append(",")
              b.append(""now_avg_TPI":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"))) + """) //拥堵度
              b.append(",")
              b.append(""now_avg_space":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))) + """) //车间距
              b.append(",")
              b.append(""cameraId":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId"))) + """) //摄像头
              b.append(",")
              b.append(""time":"" + Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time"))) + """) //时间
              b.append("}")
              //将采集的数据存贮到采集器的内部进行转换
              store(b.toString())
            }
           table.close()
      }
      restart("Trying to connect again")
    }
  }

  //获取当前分钟数
  def thisMinute(): String = {
    val edate: Calendar = Calendar.getInstance()
    val endMinute = sdf.format(edate.getTime()) + ":00"
    (endMinute)
  }

 //计算出需要拿取的数据时间节点及预测的时间点
  def getTime(): mutable.Map[String, Long] = {
    //计算出最新的5分钟时间节点
    val date: Calendar = Calendar.getInstance()
    val indexMinute = sdf.format(date.getTime()) + ":00"
    var dt: String = null
    val minute = fmtminute.format(date.getTime)
    val rs: Int = Integer.valueOf(minute) / 5
    if (Integer.valueOf(minute) % 5 != 0 && Integer.valueOf(minute) % 5 > 2) {
      val min = (rs * 5).toString()
      val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
      dt = builderDate.toString()
    } else {
      val min = ((rs * 5) - 5).toString()
      val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
      dt = builderDate.toString()
    }
    
    var map: mutable.Map[String, Long] = mutable.Map()
    //预测的时间点
    val preddate: Calendar = Calendar.getInstance()
    preddate.setTime(format.parse(dt))
    preddate.add(Calendar.MINUTE, +(2 * 5))
    val Minute_pred = sdf.format(preddate.getTime()) + ":00"
    val dlong_pred: Long = format.parse(Minute_pred).getTime()
    map("rowkey_pred") = dlong_pred

    map("rowkey_1") = format.parse(dt).getTime().longValue() //5

    val newdate: Calendar = Calendar.getInstance()
    newdate.setTime(format.parse(dt))
    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_2 = sdf.format(newdate.getTime()) + ":00"
    val dlong_2: Long = format.parse(Minute_2).getTime()
    map("rowkey_2") = dlong_2

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_3 = sdf.format(newdate.getTime()) + ":00"
    val dlong_3: Long = format.parse(Minute_3).getTime()
    map("rowkey_3") = dlong_3

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_4 = sdf.format(newdate.getTime()) + ":00"
    val dlong_4: Long = format.parse(Minute_4).getTime()
    map("rowkey_4") = dlong_4

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_5 = sdf.format(newdate.getTime()) + ":00"
    val dlong_5: Long = format.parse(Minute_5).getTime()
    map("rowkey_5") = dlong_5

    (map)
  }
}

  

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/KdeS/p/14307109.html