SparkCore离线计算案列

  • 案列一:

       使用SparkCore模块监听UDP端口数据经过逻辑处理数据后存入myslq

        

package com.lg.bigdata.core

import org.apache.spark.SparkConf
import com.alibaba.fastjson.JSON
import java.util.LinkedHashMap
import org.apache.spark.sql.Row
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import java.util.Calendar
import java.text.SimpleDateFormat
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Table
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.Cell
import org.apache.hadoop.hbase.CellUtil
import org.apache.hadoop.util.StringUtils
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import java.net.DatagramSocket
import java.net.DatagramPacket
import java.nio.ByteBuffer
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.CallableStatement
import com.lg.blgdata.utils.DruidConnectionPool
import scala.collection.mutable.Buffer
import java.lang.Double
import java.util.concurrent.Executors
import java.util.concurrent.Callable
import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.util.control.Breaks._
import org.apache.spark.SparkContext

/** 
 *      一:此模块的功能(排队长度预测)
 *              作用:接收哈工大的事件信息结合我们自己的平台数据做排队长度预测
 *    UDP协议可解析的数据:事件发生时间,事件点位摄像头,事件类型,车道信息等其他信息未解析出      
 *    
 *    (1)计算预测排队长度
 *    (2)计算实时排队长度
 */
object EventMonitorUdp {
	val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
	val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
	val TABLENAME = "jzw_data:section_1min"
	
	/**
	 * 	可缓存线程池:
	 *  newCachedThreadPool特点,
	 *  	(1)创建数量几乎没有数量
	 *  	(2)空闲的线程默认1分钟会自动终止,自动线程回收
	 *  	(3)注意任务数量,可能由于大量的任务同时运行导致系统瘫痪 
	 */
	val pool=Executors.newCachedThreadPool()
	
	def main(args: Array[String]): Unit = {
	    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("EventMonitorUdp")

      //创建spark上下文对象
      val sc = new SparkContext(config)

	
	    var ds:DatagramSocket = null
	    try { //1.创建接收方对象,指定那个端口接收数据
			    ds= new DatagramSocket(9101)
					val buf:Array[scala.Byte]=new Array(700)
			
					while(true) { 
						   val sTime:Array[scala.Byte]=new Array(18)

								//2.创建一个数据包,指定数据 
								val dp:DatagramPacket = new DatagramPacket(buf,buf.length)

								//3.接收数据,阻塞方法 
								ds.receive(dp) 

  						  //实际dp不一定有1024个字节 //获取收到的数据 
  						  val data:Array[scala.Byte] =dp.getData()

								//(1)协议类型 :1 (2)消息类型:交通事件
								if(data.apply(2)==1  && data.apply(3)==2){ 
  									//一.解析出发生时间 
  									Array.copy(data, 16, sTime,0, 18)//源数组,开始下标,目标数组,目标数组开始下标,目标数组结束下标
  									val stbuf:StringBuffer = new StringBuffer()
  									sTime.foreach(x⇒{
  									    stbuf.append(x.toString()+",")
  									})
  									val startTime=asciiToString(stbuf.toString())
  
  									//时间格式化
  									val buffer:StringBuffer=new	StringBuffer(startTime)
  									buffer.insert(4,"-") 
  									.insert(7,"-") 
  									.insert(10," ")
  									.insert(13,":") 
  									.insert(16,":"); 
  									val fmtTime:String=buffer.toString().substring(0,19)
									
										//二.解析到摄像头编号 
									  val camIdByte:Array[scala.Byte]=new Array(4); 
									  System.arraycopy(data, 10,camIdByte, 0, 4); 
            				val camId:Int =ByteBuffer.wrap(camIdByte).getInt()+1
            				
            				//摄像头编号补三位 
            				var carmID:String=""
            				if(String.valueOf(camId).length()==1) {
            					carmID="V00"+camId
            				} 
            				if(String.valueOf(camId).length()==2) {
            					carmID="V0"+camId 
            				} 
            				if(String.valueOf(camId).length()==3) {
            					carmID="V"+camId 
            				}

            	      //三.事件类型 data[52]+","+data[53],第一位都是0 
            				//只有停车事件符合条件:1 
            				val event:Int=data.apply(53)
            
            				//四.车道 1~10 
            				val lane:Int=data.apply(15)
            				
            			//条件一:目前只监听停车事件 则进入条件二 
          				if(event==1) {
          					if(carmID.length()>0) {
          						var map: mutable.Map[String, String] =getCameraData(carmID)
          						if(map.size>0) {
            						val uuid:String=UUID.randomUUID().toString().replaceAll("-", "")
            						var datamap: mutable.Map[String, String] =mutable.Map()
            						    datamap("uuid")=uuid
            						    datamap("startTime")=fmtTime
            						    datamap("event")="停车事件"
            						    datamap("carmID")=map.get("curr_cam").get
            				     pool.execute(new ThreadHGD(map,datamap))
          						}
          					} 
          			}
							}
					}
			    pool.shutdown()
			   }catch { 
			     case e: Exception => {
									e.printStackTrace()
									pool.shutdown()
									ds.close()//关闭连接
							}
		    }
			   
			   sc.stop()
	}
	
	/**
	 * map: 			mysql获取到的摄像头信息等
	 * datamap:   uuid→uuid,time→时间 , event→"停车事件",carmID→摄像头ID
	 * 
	 */
	class ThreadHGD(map: mutable.Map[String, String],datamap: mutable.Map[String, String]) extends Runnable{
	  
	               override def run(){
	                 //记录是否满足预测拍对长度
	                 var thbool:Boolean=false
	                 
	                 //此状态是判断是否持续报警
	                 var boo:Boolean=true
	                 
	                 //记录是否是第一次1此进入
	                 var num:Int=0
	                 
	                 //持续时间
	                 var t:String=null 
	                 
	                 //预测排队长度
	                 var x:String=null
	                 
	                 //记录本预警任务的任务取消的触发次数,三次则取消
	                 var numberWN:Int=0
	                 
	                 do {
	                      val date:Calendar = Calendar.getInstance()
              					val index1:String = sdf.format(date.getTime()) + ":00"
              					
              					date.add(Calendar.MINUTE, -1);
              					val index2:String = sdf.format(date.getTime()) + ":00"
              					
              					//当前摄像头当前分钟/上一分钟的数据(流量,速度)
              					var thisValue:mutable.Map[String, String]=mutable.Map()
              					
              					//上游摄像头当前分钟/上一分钟的数据流量,速度)
              					var upstValue:mutable.Map[String, String]=mutable.Map()
              					
              				//Hbase获取(流量,速度)
              				//取当前摄像头当前分钟的数据
              					println(index1+"=="+index2+"=="+map)
              				  //curr_cam当前区段摄像头
                  			//upst_cam上游区段摄像头
              					val rowkeyt1:String = format.parse(index1).getTime() + map.get("curr_cam").get.replace("V", "")
              					 thisValue=getHbase(rowkeyt1)
              					
              					   //当前分钟的当前摄像头没有数据则取上一分钟
              					   if(thisValue.size<=0){
              					     val rowkeyt2:String = format.parse(index2).getTime() + map.get("curr_cam").get.replace("V", "")
              					     thisValue=getHbase(rowkeyt2)
              					   }
              					   
              					  //取上游摄像头当前分钟的数据
              					  //上游摄像头
              						val rowkeyc1:String = format.parse(index1).getTime() + map.get("upst_cam").get.replace("V", "")
              					  upstValue=getHbase(rowkeyc1)
              					   
              					   //当前分钟的上游摄像头没有数据则取上一分钟
              					   if(upstValue.size<=0){
                  					     val rowkeyc2:String = format.parse(index2).getTime() +map.get("upst_cam").get.replace("V", "")
                  					     upstValue=getHbase(rowkeyc2)
              					   }
                    	  //条件二:当前摄像头的流量大于42辆车则进入条件三
                    	  if(thisValue.size>0){
                        	     if(Double.valueOf(upstValue.get("car_num").getOrElse("0"))>35){
                        	          thbool=true
                        	     }
                    	  }
                    	  println("本次预警是否满足条件:   "+thbool)
                    	  println("当前摄像头thisValue:   "+thisValue)
                    	  println("上游摄像头upstValue:   "+upstValue)
                    	  
                    	  //产生告警
              					if(thbool){
                    	  //一个告警只有第一次进入的时候才会进行排队长度预测
                    	      if(num==0){ 
                            	  //条件三:预测排队长度计算和预警判断
                            	  //(1) 计算当前摄像头的密度
                      					//事故时候的通行能力
                            	  val tc:Double=2900.0   
                              	var   thisDensity=(tc/25.0).formatted("%.2f")
                            	
                            	  //(2) 计算事故解除后当前路段的密度
                            	  var afterDensity:String=(Double.valueOf(map.get("cap_speed80").getOrElse("0"))/80.0).formatted("%.2f")
        
                            	  //(3) 上游摄像头的密度(密度=流量/速度)
                            	   var now_avg_densit=((Double.valueOf(upstValue.get("car_num").getOrElse("0"))/Double.valueOf(upstValue.get("now_avg_speed").getOrElse("0")))*60).formatted("%.2f")
                            	  
                            	  //(4)集结波波速    
                            	  //(80(1-(上游摄像头的密度+当前摄像头的密度)/180))
                            	   var w1=(80*(1-(Double.valueOf(now_avg_densit)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f")
                  					        //取绝对值
                            	      w1=Math.abs(Double.valueOf(w1)).formatted("%.2f")
                            	  
                  						  //(5)消散波波速
                            	  //(80(1-(事故解除后的密度+当前摄像头的密度)/180))
                            	  var w2=(80*(1-(Double.valueOf(afterDensity)+Double.valueOf(thisDensity))/180.0)).formatted("%.2f")
                            	       //取绝对值
                            	      w2=Math.abs(Double.valueOf(w2)).formatted("%.2f")
                            	  
                            	  //(6)最大排队长度时事故持续时间
                            	  //(消散波波速*(13/60))/(消散波波速-集结波波速)
                            	   t=((Double.valueOf(w2)*(13/60.0))/(Double.valueOf(w1)-Double.valueOf(w2))).formatted("%.2f")
                            	   t=Math.abs(Double.valueOf(t)).formatted("%.2f")
                            	   
                            	  //(6)事故引起的最大排队长度(预测排队长度)
                            	  //消散波波速*(最大排队长度时事故持续时间-(13/60))
                            	   x=(Double.valueOf(w2)*(Double.valueOf(t)-(13/60.0))).formatted("%.3f")
                            	   x=Math.abs(Double.valueOf(x)).toString()
                            	   num+=1
                    	      }
                    	      if(Double.valueOf(x)<0.1){//无需预警
                    	        return
                    	      }
                            
                    	      //条件四:实时排队长度(取消告警)
                            //获取上游7路摄像头
                            val cams:String=map.get("cam_7").getOrElse(null)
                            var realTQL:Double=0  //初始实时排队长度
                            
                            var startDist=Double.valueOf(map.get("cam_zh").getOrElse("0"))//本摄像头的桩号
                            
                            if(cams!=null){//如果有上游摄像头,则获取没路摄像头的数据(速度,车间距离)
                              val Lists:Array[String]=cams.split("、")
                              
                             //顺序循环上游摄像头,不满足则return
                          breakable{
                              Lists.foreach(x⇒{
                                //装载上游记录摄像头的速度和车间距离
                                var inValue:mutable.Map[String, String]=mutable.Map()
                                val rkt1:String = format.parse(index1).getTime() + x.split("-").apply(0).replace("V", "")
              					        inValue=getHbase(rkt1)
              					        if(inValue.size<=0){
              					            val rkt2:String = format.parse(index2).getTime() + x.split("-").apply(0).replace("V", "")
              					            inValue=getHbase(rkt2)
              					        }else if(inValue.size<=0){
                					          inValue("now_avg_space")="60"
                					        	inValue("now_avg_speed")="60"
              					        }
                                //把排队长度累加
                                if(inValue.size>0){
                                  //(1)判断速度是否小于20,车间距小于15
                                  if(Double.valueOf(inValue.get("now_avg_speed").getOrElse("0"))<20 &&
                                      Double.valueOf(inValue.get("now_avg_space").getOrElse("0"))<15){
                                    //(2)摄像头
                                    realTQL+=startDist-Double.valueOf(x.split("-").apply(1))
                                  }else{
                                     //上游某一个摄像头不满足则退出本论预算,实时排队长度为之前满足的部分之和
                                     break;
                                  }
                                }
                              })
                            }
                          } else{
                            //没有上游摄像头则为300米
                             realTQL=300
                          }
                    	      
                    	    // 取消告警的条件:(实时排队长度-预测排队长度)/预测排队长度 <=0.3;持续三次测取消告警; 
                          //单位,千米
                           val isWarning=Math.abs(scala.math.round(((realTQL/1000-Double.valueOf(x))/Double.valueOf(x))))
                           if(isWarning<=0.3 ||realTQL<=300){
                             numberWN+=1
                           }
                           
                           if(numberWN==3){
                             boo=false
                           }
                           
                    	     if(boo){//任务取消时候则把任务状态更新
                    	         datamap("state_len")="1"
                    	     }else{
                    	         datamap("state_len")="0"
                    	     }
                    	      datamap("timelen")=t
                    	    	datamap("act_len_que")=(realTQL/1000.0).toString()
                    	    	datamap("pred_len_que")=x
                    	    	datamap("gz_time")=index1
                    	    	println("datamap:  "+datamap)
                    	    	updateCameraData(datamap)
                            TimeUnit.MINUTES.sleep(1)//1分钟后持续跟踪
              				}else{//不满足条件
              				    boo=false
              				}	  
	                 }while(boo)
	               }		
	}
	
  //Ascii码转char
	def asciiToString(value:String) :String={
			val sbu:StringBuffer = new StringBuffer();
	    val chars:Array[String]=value.split(",")
			for (i ← 0 to chars.length-1) {
				sbu.append(Integer.parseInt(chars.apply(i)).toChar)
			}
	    return sbu.toString();
	}

	
	
	/**
  	 *   连接Hbase,拿到摄像头数据
    * carId:rowkey
  	*/
	def getHbase(carId:String):mutable.Map[String, String]={
      
	    var map: mutable.Map[String, String] = mutable.Map()
	  try{
			    val conf = HBaseConfiguration.create()
					val hConn = ConnectionFactory.createConnection(conf)

					//通过表名得到想要查询的表
					val hTable = hConn.getTable(TableName.valueOf(TABLENAME))
					//导入隐式转换
					import scala.collection.JavaConverters._
					
					val get:Get = new Get(Bytes.toBytes(carId))
					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("car_num"))       //流量
					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed")) //速度
					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("time"))          //时间
					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"))//密度
					get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space"))//车间距
					
					val result= hTable.get(get)
			    var cells:Buffer[Cell]=null
					if(!result.isEmpty()){ //如果result没有返回结果则不解析字段
					  cells=result.listCells().asScala
					}
					if (cells != null) {	
										for (kv <- cells) {
													//每个单元格含有 列族 列名  列值
													//获取列名-获取列值
													map(Bytes.toString(CellUtil.cloneQualifier(kv)))=Bytes.toString(CellUtil.cloneValue(kv))
										}
					}
			hTable.close()
			hConn.close()
	  }catch { 
			     case e: Exception => {
												e.printStackTrace()
											}
		 }
		(map)
}
	//mysql排队长度信息添加
	def updateCameraData(map:mutable.Map[String, String]){
	  	var connection:Connection=null
			var pstm:CallableStatement=null
			
			try {
							connection=DruidConnectionPool.getConnection
							//开启事物,不自动提交
							connection.setAutoCommit(false)
      
        	    val sql:String="insert into jzwdata.warning_record_sum(uuid,state_len,warning_event,happen_time,location_cam,timelen,act_len_que,pred_len_que,gz_time) values(?,?,?,?,?,?,?,?,?)"
				      
        	    //将计算好的聚合数据写入Mysql,t_worcount表
							pstm=connection.prepareCall(sql)
							pstm.setString(1,map.get("uuid").get)                   // 任务分组ID
							pstm.setString(2,map.get("state_len").getOrElse("0"))   // 任务状态(0:解除报警 1:正常报警)
							pstm.setString(3,map.get("event").getOrElse("停车事件"))  // 事件类型(停车事件)
							pstm.setString(4,map.get("startTime").get)              // 预警发生时刻
							pstm.setString(5,map.get("carmID").get)                 // 位置(摄像头编号)
							pstm.setString(6,map.get("timelen").get)                // 已持续时长
							pstm.setString(7,map.get("act_len_que").get)            // 当前排队长度
							pstm.setString(8,map.get("pred_len_que").get)           // 预测排队长度
							pstm.setString(9,map.get("gz_time").get)                // 预警跟踪时间
							
							pstm.executeUpdate()//(数据量少)提交
							
							//提交事务
							connection.commit()

			  } catch {
										case e: Exception => {
											e.printStackTrace()
											//回滚事务
											connection.rollback()
										} 
							}finally{
											//释放资源
											if(pstm!=null){
												pstm.close()
											}
											
											if(connection!=null){
												connection.close()//连接还回连接池,并不是真正关闭
											}
							}				
	}
	
	//根据摄像头编号去mysql获取
	//eum→1  lenque_cam:预测排队长度/实时排队长度摄像头选取
	def getCameraData(carmID:String): mutable.Map[String, String]={
          //获取一个数据库连接(适用数据库连接池)
							var connection:Connection=null
              var statement:PreparedStatement=null
              var resultSet:ResultSet=null
              var map: mutable.Map[String, String] = mutable.Map()
		   try{
                connection=DruidConnectionPool.getConnection
                // SQL语句
                 val querySql= "select cam_ID_ac,curr_cam,upst_cam,down_cam,cap_speed80,cam_zh,cam_7 from lenque_cam where cam_ID_ac =? limit 1;";
    		        statement=connection.prepareStatement(querySql)
                statement.setString(1, carmID)
							  resultSet=statement.executeQuery()
							  
  							 //遍历结果集
    						while(resultSet.next()){
        									map.put("cam_ID_ac", resultSet.getString(1))   //哈工大的摄像头编号
                  				map.put("curr_cam", resultSet.getString(2))    //当前区段摄像头
                  				map.put("upst_cam", resultSet.getString(3))    //上游区段摄像头     
                  				map.put("down_cam", resultSet.getString(4))    //下游区段摄像头
                  				map.put("cap_speed80", resultSet.getString(5)) //速度80对应的通行能力
                  				map.put("cam_zh", resultSet.getString(6))      //桩号
                  				map.put("cam_7", resultSet.getString(7))       //上游7路摄像头
    						}
							 
				 }catch{
            					case e: Exception => {
            						throw new RuntimeException("查询历史在网车出现异常!")
            				  }
					} finally{
            						if(resultSet!=null){
            							resultSet.close()
            						}
            						if(statement!=null){
            							statement.close()
            						}
            						if(connection!=null){
            							connection.close()
            						}
			 }
			return map
	}
}

  

  • 案列二:

       使用SparkCore模块从Redis拉取数据处理后存入Hbase数据库

        

package com.lg.bigdata.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import redis.clients.jedis.Jedis
import java.util.Set
import redis.clients.jedis.ScanParams
import redis.clients.jedis.ScanResult
import org.apache.spark.rdd.RDD
import com.google.gson.Gson
import java.util.Map
import org.apache.hadoop.hbase.util.Bytes
import com.google.gson.reflect.TypeToken
import scala.collection.Seq
import scala.collection.mutable.HashMap
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.DataFrame
import scala.collection.mutable
import com.lg.blgdata.utils.JedisConnectionPool
import java.lang.Long
import java.util.Calendar
import java.lang.Double
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import com.lg.bigdata.utils.JZWUtil

/**
 *    一.1分钟数合成
 * 	从Redis合成分钟数据存入hbase
 *         要求:
 *         (1)断面分类
 *                       (2)车道划分
 *
 *  二.原始数据格式
 *         C:摄像头编号
 *         T:时间
 *         L:车道(L→左,M→中,R→右)
 *         N:车型(car→小轿车,bus→卡车)
 *         S1:速度(默认km/h)
 *         S2:车间距(默认m)
 */
object MinuteData {
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  //创建Hbase连接
  val tableName="jzw_data:spot_jt_para_1min"
  val conf = HBaseConfiguration.create()
      //查询时候的输出类型
      conf.set(TableInputFormat.INPUT_TABLE, tableName)
  
  def main(args: Array[String]): Unit = {
    //.setMaster("local[*]")
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("MinuteData")
    
    //创建spark上下文对象
    val sc = new SparkContext(config)
    
    //当前分钟开始
    val sdate: Calendar = Calendar.getInstance();
    sdate.add(Calendar.MINUTE, -1) // 当前时间减1分钟
    val startMinute = sdf.format(sdate.getTime()) + ":00"
    val slong = format.parse(startMinute).getTime();

    //结束时间
    val edate: Calendar = Calendar.getInstance()
    val endMinute = sdf.format(edate.getTime()) + ":00"
    val elong = format.parse(endMinute).getTime()
    
    //获取一个jedis连接池
    val jedis: Jedis = JedisConnectionPool.getConnections()
    jedis.select(0) //db,默认有16个

    //导入隐式转换
    import scala.collection.JavaConverters._

    //所有摄像头的RDD,原始数据
    val list = JZWUtil.getCameraId

    //gson转换把类型设置为String后时间戳不会被强制转换
    val gson: Gson = new Gson()

    var rsListL = new ListBuffer[mutable.Map[String, String]]()
    var rsListM = new ListBuffer[mutable.Map[String, String]]()
    var rsListR = new ListBuffer[mutable.Map[String, String]]()

    list.foreach {
      rdd ⇒
        { //获取一个摄像头key的最新100条数据(1分钟内数据目前没有超过这么多)
        //保留当前key最新的200条数据
        val boorm:String=jedis.ltrim(rdd, 0, 200)
        
          //获取最右端的100条数据
          val result: java.util.List[String] = jedis.lrange(rdd, 0, 150)
          val its: java.util.Iterator[String] = result.iterator()
          val carMaps = JZWUtil.contentMatch(rdd)

          if (carMaps != null) { //摄像头已经配置
            val spacing = Double.parseDouble(carMaps.get("dZLength").get.toString()) //dZLength:检测单元距离
            var booL: Boolean = false
            var booM: Boolean = false
            var booR: Boolean = false

            var mapL: mutable.Map[String, String] = mutable.Map() //L
            var mapM: mutable.Map[String, String] = mutable.Map() //M
            var mapR: mutable.Map[String, String] = mutable.Map() //R

            //默认左车流量(和)
            var sumL: Int = 0 // L
            var sumM: Int = 0 // M
            var sumR: Int = 0 // R

            //标准车流量PCU(和)
            var pcuL: Int = 0
            var pcuM: Int = 0
            var pcuR: Int = 0

            //速度(和)
            var speedL: Double = 0
            var speedM: Double = 0
            var speedR: Double = 0

            //车间距(和)
            var spacingL: Double = 0
            var spacingM: Double = 0
            var spacingR: Double = 0

            while (its.hasNext()) {
              val coum: String = its.next()
              
              //数据扁平化
              val map: Map[String, String] = gson.fromJson(coum, new TypeToken[Map[String, String]]() {}.getType())
              //过滤出当前分钟内的数据,无效数据剔除
          
              if (map.get("T") != null) {
                val times = Long.parseLong(map.get("T").toString())
                
                if (times > 0 && slong <= times && times < elong) { //判断在时间段内

                  val laneType: String = map.get("L").toString() //车道类别

                  val carType: String = map.get("N").toString() //车型类别

                  var speed: Double = 0.0 //速度
                  if (map.get("S1") != null) {
                    speed = Double.parseDouble(map.get("S1").toString())
                  }

                  var spacing: Double = 0.0 //间距
                  if (map.get("S2") != null) {
                    spacing = Double.parseDouble(map.get("S2").toString())
                  }

                  if (laneType.equals("L")) { //左车道
                    sumL += 1
                    speedL += speed
                    spacingL += spacing
                    if (carType.equals("car")) {
                      pcuL += 1
                    } else if (carType.equals("bus")) {
                      pcuL += 2
                    }
                    booL = true
                  }

                  if (laneType.equals("M")) { //中车道
                    sumM += 1
                    speedM += speed
                    spacingM += spacing
                    if (carType.equals("car")) {
                      pcuM += 1
                    } else if (carType.equals("bus")) {
                      pcuM += 2
                    }
                    booM = true
                  }
                  if (laneType.equals("R")) { //右车道
                    sumR += 1
                    speedR += speed
                    spacingR += spacing
                    if (carType.equals("car")) {
                      pcuR += 1
                    } else if (carType.equals("bus")) {
                      pcuR += 2
                    }
                    booR = true
                  }
                }
              }
            }
            if (booL) {
                          /*
													 * rowkey设计:
													   *        列:00111607048520000
													 *   001 :前三位为摄像头编号去'V'
													 *   1  :第四位 (L:1  M:2  R:3)
													 *   1607048520000:第四位之后的部分为精确到分钟的时间戳
													 */
              mapL("key") = (rdd.replace("V", "") + "1" + elong).toString() //hbase rowkey
              mapL("car_num") = sumL.toString() //车流量
              mapL("time") = endMinute //时间(2020-12-02 19:49:00)
              mapL("lane_position") = "L" //车道类别
              mapL("now_flow_pcu") = pcuL.toString() //标准车流量PCU(平均)
              mapL("now_avg_speed") = (speedL / sumL).formatted("%.2f").toString() //平均速度(车道速度和/车道车辆数)(平均)
              mapL("now_avg_densit") = (1 /(spacingL / sumL/1000)).formatted("%.2f").toString() //当前车道密度(1km/平均车间距)
              mapL("now_avg_densit_pcu") = (1 /(spacingL / pcuL/1000)).formatted("%.2f").toString() //当前pcu车道密度(1km/平均车间距)
              mapL("now_avg_passTime") = scala.math.round(spacing / (speedL / sumL) * 60).toString() //通行时间(车道检测单元长度/平均速度*60)
              mapL("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedL / sumL / 80)) * 10)).toString() //拥堵度((1-(平均速度/最高限速))*10)
              mapL("now_avg_space") = (spacingL / sumL).formatted("%.2f").toString() //车间距(平均)
              mapL("cameraId") = rdd
              rsListL.append(mapL)
            }
            if (booM) {
              mapM("key") = rdd.replace("V", "") + "2" + elong
              mapM("car_num") = sumM.toString()
              mapM("time") = endMinute
              mapM("lane_position") = "M"
              mapM("now_flow_pcu") = pcuM.toString()
              mapM("now_avg_speed") = (speedM / sumM).formatted("%.2f").toString()
              mapM("now_avg_densit") = (1 /( spacingM / sumM/1000)).formatted("%.2f").toString()
              mapM("now_avg_densit_pcu") = (1 /( spacingM / pcuM/1000)).formatted("%.2f").toString()
              mapM("now_avg_passTime") = scala.math.round(spacing / (speedM / sumM) * 60).toString()
              mapM("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedM / sumM / 80)) * 10)).toString()
              mapM("now_avg_space") = (spacingM / sumM).formatted("%.2f").toString()
              mapM("cameraId") = rdd
              rsListM.append(mapM)
            }
            if (booR) {
              mapR("key") = rdd.replace("V", "") + "3" + elong
              mapR("car_num") = sumR.toString()
              mapR("time") = endMinute
              mapR("lane_position") = "R"
              mapR("now_flow_pcu") = pcuR.toString()
              mapR("now_avg_speed") = (speedR / sumR).formatted("%.2f").toString()
              mapR("now_avg_densit") = (1 / (spacingR / pcuR/1000)).formatted("%.2f").toString()
              mapR("now_avg_densit_pcu") = (1 / spacingR / sumR).formatted("%.2f").toString()
              mapR("now_avg_passTime") = scala.math.round(spacing / (speedR / sumR) * 60).toString()
              mapR("now_avg_TPI") = Math.abs(scala.math.round((1 - (speedR / sumR / 80)) * 10)).toString()
              mapR("now_avg_space") = (spacingR / sumR).formatted("%.2f").toString()
              mapR("cameraId") = rdd
              rsListR.append(mapR)
            }
          }
        }
    }
    /**
     * 求排队长度后入库hbase
     * 参数计算条件:
     *  1.当前检测断面和他上游的检测断面的车速小于5km/h,且车间距小于10m
     *  2.满足条件一的前提下:排队长度=(1-(相邻上游断面的平均速度/80))*当前检测断面与相邻检测断面的距离)
     */

     rsListL.foreach(x ⇒ {
      //当前检测断面的检测断面的车速小于5km/h,且车间距小于10m
      var boos: Boolean = false
      val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
      if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {

        if (carMaps.get("cName").get != null) {
          //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
          rsListL.foreach(y ⇒ {
            if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
              if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                boos = true
              }
            }
          })
        }
      }
      if (boos) {
        val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) ).formatted("%.2f").toString()
        x("nwo_len_que") = queueLength
      } else {
        x("nwo_len_que") = "0"
      }
    })

   rsListM.foreach(x ⇒ {
      var boos: Boolean = false
      val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
      if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {
        if (carMaps.get("cName").get != null) {
          //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
          rsListM.foreach(y ⇒ {
            if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
              if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                boos = true
              }
            }
          })
        }
      }
      if (boos) {
        val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get))).formatted("%.2f").toString()
        x("nwo_len_que") = queueLength
      } else {
        x("nwo_len_que") = "0"
      }
    })

    rsListR.foreach(x ⇒ {
      var boos: Boolean = false
      val carMaps = JZWUtil.contentMatch(x.get("cameraId").get)
      if (Double.parseDouble(x.get("now_avg_speed").get) < 5 && Double.parseDouble(x.get("now_avg_space").get) < 10) {
        if (carMaps.get("cName").get != null) {
          //且他相邻上游的检测断面的车速小于5km/h,且车间距小于10m
          rsListR.foreach(y ⇒ {
            if (y.get("cameraId").get.equals(carMaps.get("cName").get)) {
              if (Double.parseDouble(y.get("now_avg_speed").get) < 5 && Double.parseDouble(y.get("now_avg_space").get) < 10) {
                boos = true
              }
            }
          })
        }
      }
      if (boos) {
        val queueLength = ((1 - (Double.parseDouble(x.get("now_avg_speed").get) / 80)) * (Double.parseDouble(carMaps.get("cDLength").get)) * 1000).formatted("%.2f").toString()
        x("nwo_len_que") = queueLength
      } else {
        x("nwo_len_que") = "0"
      }
    })
       rsListL = rsListL.++=(rsListM).++=(rsListR) 
    val rdd = sc.parallelize(rsListL)
    
     val jobConf = new JobConf(conf)
      	jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
      	jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
 
      	rdd.foreach(println)
    val dataRDD=rdd.map(x⇒{
      JZWUtil.convert(x,1)
    })
   
    dataRDD.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
  
   //查询指定库的key
  def getRedisKey(db: Int) = {
    val jedis: Jedis = JedisConnectionPool.getConnections()
    jedis.select(db) //db,默认有16个
    val keys: Set[String] = jedis.keys("*")
    val it: java.util.Iterator[String] = keys.iterator()
    while (it.hasNext()) {
      println(it.next())
    }
  }
}

  

  • 案列三:

       使用SparkCore从Hbase拉取数据合成后存入Hbase另一个表

        

package com.lg.bigdata.core

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.text.SimpleDateFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.spark.rdd.RDD
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import redis.clients.jedis.ScanResult
import java.util.Calendar
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.Filter
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator
import com.lg.bigdata.utils.JZWUtil
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import scala.collection.mutable
import org.apache.hadoop.hbase.filter.FilterList
import scala.collection.mutable.ListBuffer
import java.lang.Double
import org.apache.hadoop.mapred.JobConf
import java.util.Date
import java.lang.Long
import org.apache.hadoop.hbase.client.Put

/**
 *    一.预测5分钟数据合成 ,车道级
 * 	从hbase拿取5分钟车道数据合成5分钟车道预测数据存入hbase
 *         要求:
 *        (1)流量
 *        (2)速度
 *        (3)密度
 *        (4)拥堵度
 *
 *
 */
object PredMinute5Data {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val fmtminute = new SimpleDateFormat("mm")

  //创建Hbase连接
  val tableNameOutPut = "jzw_data:spot_jt_para_5min_pred" //预测5分钟数据合成
  val tableNameInPut = "jzw_data:spot_jt_para_5min"
  val conf = HBaseConfiguration.create()

  //查询时候的输出类型
  conf.set(TableInputFormat.INPUT_TABLE, tableNameInPut)

  def main(args: Array[String]): Unit = {
    //.setMaster("local[*]")
    val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("PredMinute5Data")

    //创建spark上下文对象
    val sc = new SparkContext(config)

    //5分钟从HBase读取数据形成RDD
    val listId = JZWUtil.getCameraId
    val rowkey_5 = "." + getTime.get("rowkey_1").get //权重5
    val rowkey_4 = "." + getTime.get("rowkey_2").get //权重4
    val rowkey_3 = "." + getTime.get("rowkey_3").get //权重3
    val rowkey_2 = "." + getTime.get("rowkey_4").get //权重2
    val rowkey_1 = "." + getTime.get("rowkey_5").get //权重1

    val scan = new Scan()
    scan.setCacheBlocks(false)
    scan.addFamily(Bytes.toBytes("info"))

    /*
												 *   摄像头前缀获取数据,
												 * MUST_PASS_ONE: 取并集 相当于or 操作
												 * MUST_PASS_ALL: 取交集 相当一and操作
												 */
    val filterList: FilterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
    val rowFilter1: RowFilter = new RowFilter(
      CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(rowkey_1))
    filterList.addFilter(rowFilter1)

    val rowFilter2: RowFilter = new RowFilter(
      CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(rowkey_2))
    filterList.addFilter(rowFilter2)

    val rowFilter3: RowFilter = new RowFilter(
      CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(rowkey_3))
    filterList.addFilter(rowFilter3)

    val rowFilter4: RowFilter = new RowFilter(
      CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(rowkey_4))
    filterList.addFilter(rowFilter4)

    val rowFilter5: RowFilter = new RowFilter(
      CompareFilter.CompareOp.EQUAL,
      new RegexStringComparator(rowkey_5))
    filterList.addFilter(rowFilter5)

    scan.setFilter(filterList)

    //将scan类转化成string类型
    val scan_str = TableMapReduceUtil.convertScanToString(scan)
    conf.set(TableInputFormat.SCAN, scan_str)
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf, //配置文件
      classOf[TableInputFormat], //classOf:取这个类的类型
      classOf[ImmutableBytesWritable], //rowkey的类型
      classOf[Result]) //结果value的类型
      
     var resultDataL = new ListBuffer[mutable.Map[String, String]]() //L
     var resultDataM = new ListBuffer[mutable.Map[String, String]]() //M
     var resultDataR = new ListBuffer[mutable.Map[String, String]]() //R
    
   listId.foreach(x ⇒ { //循环摄像头
          var rsListL = new ListBuffer[mutable.Map[String, String]]()
          var rsListM = new ListBuffer[mutable.Map[String, String]]()
          var rsListR = new ListBuffer[mutable.Map[String, String]]()
        
          val variableL = sc.broadcast(rsListL)
          val variableM = sc.broadcast(rsListM)
          val variableR = sc.broadcast(rsListR)
          hbaseRDD.foreach {
            case (rowkey, result) =>
              val key: String = Bytes.toString(result.getRow)
              val time: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
              val car_num: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("car_num")))
              val lane_position: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("lane_position")))
              val now_avg_speed: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed")))
              val now_avg_densit: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit")))
              val now_avg_passTime: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_passTime")))
              val now_avg_TPI: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI")))
              val now_avg_space: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("now_avg_space")))
              val cameraId: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("cameraId")))
              
                  if(x.equals(cameraId)){
                        var mapL: mutable.Map[String, String] = mutable.Map() //L
                        var mapM: mutable.Map[String, String] = mutable.Map() //M
                        var mapR: mutable.Map[String, String] = mutable.Map() //R
                        
                        var carNum: Double = 0
                        if (car_num != null || !car_num.equals("0")) { //流量
                          carNum = Double.valueOf(car_num)
                        }
            
                        var nowAvgSpeedNum: Double = 0
                        if (now_avg_speed != null || !now_avg_speed.equals("0")) { //速度
                          nowAvgSpeedNum = Double.parseDouble(now_avg_speed)
                        }
            
                        var nowAvgDensitNum: Double = 0
                        if (now_avg_densit != null || !now_avg_densit.equals("0")) { //当前车道密度
                          nowAvgDensitNum = Double.parseDouble(now_avg_densit)
                        }
            
                        var nowAvgTPI: Double = 0
                        if (now_avg_TPI != null || !now_avg_TPI.equals("0")) { //拥堵度
                          nowAvgTPI = Double.parseDouble(now_avg_TPI)
                        }
                        
                        if(lane_position.equals("L")){
                        	 mapL("time")=time                     //时间
                           mapL("car_num")=car_num               //流量
                        	 mapL("now_avg_speed")=now_avg_speed   //速度
                        	 mapL("now_avg_densit")=now_avg_densit //密度
                        	
                        	 
                        }
                        
                        if(lane_position.equals("M")){
                           mapM("time")=time
                        	 mapM("car_num")=car_num
                        	 mapM("now_avg_speed")=now_avg_speed
                        	 mapM("now_avg_densit")=now_avg_densit
                        }
                        
                        if(lane_position.equals("R")){
                           mapR("time")=time
                        	 mapR("car_num")=car_num
                        	 mapR("now_avg_speed")=now_avg_speed
                        	 mapR("now_avg_densit")=now_avg_densit
                        }
                        
                      //把一路摄像头的左,中,右的信息保存入List  
                     if(mapL.size>0){
                        variableL.value.append(mapL)
                     }
                     
                     if(mapM.size>0){
                        variableM.value.append(mapM)
                     }
                     
                     if(mapR.size>0){
                        variableR.value.append(mapR)
                     }
                  }
           
        }
           //保存可用数据
           var DataPL: mutable.Map[String, String] = mutable.Map() //L
           var DataPM: mutable.Map[String, String] = mutable.Map() //M
           var DataPR: mutable.Map[String, String] = mutable.Map() //R 
           
          if(variableL.value.size>0){//L
              //时间升序
              val dataL=variableL.value.sortWith{
                  case (a,b)=>{
                     Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
                     Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
                  }
              }
             //套用公式计算
             //Ft=(5Xt-1+4Xt-2+3Xt-3+2Xt-4+Xt-5)/15
              var carNumL:Int=6        //权重
              var carDataL:Double=0    //流量值(分子)
              var speedL:Double=0      //速度值(分子)
              var densitL:Double=0     //密度值(分子)
              var rsNumL=0             //权重之和(分母)
              dataL.foreach(x⇒{
                      carNumL=carNumL-1
                      rsNumL+=carNumL
                      carDataL+=Double.valueOf(x.get("car_num").get)*(carNumL)
                      speedL+=Double.valueOf(x.get("now_avg_speed").get)*(carNumL)
                      densitL+=Double.valueOf(x.get("now_avg_densit").get)*(carNumL)
              })
              val car_numL:Double=Math.abs(Double.valueOf(scala.math.round(carDataL/rsNumL)))  //流量(取整)
              val now_avg_speedL=(speedL/rsNumL).formatted("%.2f")                             //(速度)四舍五入保留两位小数
              val now_avg_densitL=(densitL/rsNumL).formatted("%.2f")                           //(密度)四舍五入保留两位小数
              DataPL("key")=x.replace("V", "") + "1" + getTime.get("rowkey_pred").get  //hbase rowkey
              DataPL("time")=format.format(getTime.get("rowkey_pred").get)
              DataPL("car_num")= car_numL.toString()      
              DataPL("now_avg_speed")=now_avg_speedL
              DataPL("now_avg_densit")= now_avg_densitL               
              DataPL("now_avg_TPI") =Math.abs(((1 - (Double.valueOf(now_avg_speedL) / 80)) * 10)).formatted("%.2f")   //拥堵度((1-(平均速度/最高限速))*10)
              DataPL("cameraId")=x          //摄像头编号                                                                 
              DataPL("lane_position") = "L" //车道类别
              
              resultDataL.append(DataPL)
          }
          
          if(variableM.value.size>0){//M
              val dataM=variableM.value.sortWith{
                case (a,b)=>{
              	  Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
              		Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
              			
                }
              }
              
              var carNumM:Int=6        //权重
              var carDataM:Double=0    //流量值(分子)
              var speedM:Double=0      //速度值(分子)
              var densitM:Double=0     //密度值(分子)
              var rsNumM=0             //权重之和(分母)
              dataM.foreach(x⇒{
                      carNumM=carNumM-1
                      rsNumM+=carNumM
                      carDataM+=Double.valueOf(x.get("car_num").get)*(carNumM)
                      speedM+=Double.valueOf(x.get("now_avg_speed").get)*(carNumM)
                      densitM+=Double.valueOf(x.get("now_avg_densit").get)*(carNumM)
              })
              
              val car_numM:Double=Math.abs(Double.valueOf(scala.math.round(carDataM/rsNumM)))  //流量(取整)
              val now_avg_speedM=(speedM/rsNumM).formatted("%.2f")                             //(速度)四舍五入保留两位小数
              val now_avg_densitM=(densitM/rsNumM).formatted("%.2f")                           //(密度)四舍五入保留两位小数
              
              DataPM("key")=x.replace("V", "") + "2" + getTime.get("rowkey_pred").get  
              DataPM("time")=format.format(getTime.get("rowkey_pred").get)
              DataPM("car_num")= car_numM.toString()      
              DataPM("now_avg_speed")=now_avg_speedM
              DataPM("now_avg_densit")= now_avg_densitM              
              DataPM("now_avg_TPI") = Math.abs(((1 - (Double.valueOf(now_avg_speedM) / 80)) * 10)).formatted("%.2f")   //拥堵度((1-(平均速度/最高限速))*10)
              DataPM("cameraId")=x
              DataPM("lane_position") = "M" //车道类别
                
              resultDataM.append(DataPM)
           }  
           
           if(variableR.value.size>0){//R
              val dataR=variableR.value.sortWith{
                case (a,b)=>{
              	  Long.valueOf(format.parse(a.get("time").get.toString()).getTime())>
              	  Long.valueOf(format.parse(b.get("time").get.toString()).getTime())
                }
              }
                  
              var carNumR:Int=6        //权重
              var carDataR:Double=0    //流量值(分子)
              var speedR:Double=0      //速度值(分子)
              var densitR:Double=0     //密度值(分子)
              var rsNumR=0             //权重之和(分母)
              dataR.foreach(x⇒{
                      carNumR=carNumR-1
                      rsNumR+=carNumR
                      carDataR+=Double.valueOf(x.get("car_num").get)*(carNumR)
                      speedR+=Double.valueOf(x.get("now_avg_speed").get)*(carNumR)
                      densitR+=Double.valueOf(x.get("now_avg_densit").get)*(carNumR)
              })
              val car_numR:Double=Math.abs(Double.valueOf(scala.math.round(carDataR/rsNumR))) 
              val now_avg_speedR=(speedR/rsNumR).formatted("%.2f")                             
              val now_avg_densitR=(densitR/rsNumR).formatted("%.2f")
              
              DataPR("key")=x.replace("V", "") + "3" + getTime.get("rowkey_pred").get  
              DataPR("time")=format.format(getTime.get("rowkey_pred").get)
              DataPR("car_num")=car_numR.toString()
              DataPR("now_avg_speed")=now_avg_speedR
              DataPR("now_avg_densit")=now_avg_densitR
              DataPR("now_avg_TPI") =((1 - (Double.valueOf(now_avg_speedR)/ 80)) * 10).formatted("%.2f")  
              DataPR("cameraId")=x
              DataPR("lane_position") = "R" //车道类别
              
              resultDataR.append(DataPR)
           }
   })
   
   //合并数据
   resultDataL=resultDataL.++=(resultDataM).++=(resultDataR)
   val rdd = sc.parallelize(resultDataL)
   println("========")
   rdd.foreach(println)
   val dataRDD=rdd.map(x⇒{
     convert(x)
   })
   
  val jobConf = new JobConf(conf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])//输出数据的类型
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableNameOutPut)
   
   dataRDD.saveAsHadoopDataset(jobConf)
   sc.stop()
  }
  
  //计算出需要拿取的数据时间节点及预测的时间点
  def getTime(): mutable.Map[String, Long] = {
    //计算出最新的5分钟时间节点
    val date: Calendar = Calendar.getInstance()
    val indexMinute = sdf.format(date.getTime()) + ":00"
    var dt: String = null
    val minute = fmtminute.format(date.getTime)
    val rs: Int = Integer.valueOf(minute) / 5
    if (Integer.valueOf(minute) % 5 != 0 && Integer.valueOf(minute) % 5 > 2) {
      val min = (rs * 5).toString()
      val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
      dt = builderDate.toString()
    } else {
      val min = ((rs * 5) - 5).toString()
      val builderDate = new StringBuilder(indexMinute).replace(14, 16, min)
      dt = builderDate.toString()
    }
    
    var map: mutable.Map[String, Long] = mutable.Map()
    //预测的时间点
    val preddate: Calendar = Calendar.getInstance()
    preddate.setTime(format.parse(dt))
    preddate.add(Calendar.MINUTE, +(2 * 5))
    val Minute_pred = sdf.format(preddate.getTime()) + ":00"
    val dlong_pred: Long = format.parse(Minute_pred).getTime()
    map("rowkey_pred") = dlong_pred

    map("rowkey_1") = format.parse(dt).getTime().longValue() //5

    val newdate: Calendar = Calendar.getInstance()
    newdate.setTime(format.parse(dt))
    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_2 = sdf.format(newdate.getTime()) + ":00"
    val dlong_2: Long = format.parse(Minute_2).getTime()
    map("rowkey_2") = dlong_2

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_3 = sdf.format(newdate.getTime()) + ":00"
    val dlong_3: Long = format.parse(Minute_3).getTime()
    map("rowkey_3") = dlong_3

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_4 = sdf.format(newdate.getTime()) + ":00"
    val dlong_4: Long = format.parse(Minute_4).getTime()
    map("rowkey_4") = dlong_4

    newdate.add(Calendar.MINUTE, -(1 * 5))
    val Minute_5 = sdf.format(newdate.getTime()) + ":00"
    val dlong_5: Long = format.parse(Minute_5).getTime()
    map("rowkey_5") = dlong_5

    (map)
  }
  
  //定义往Hbase插入数据的方法
 def convert(map: mutable.Map[String, String])= {
    //1.获取表对像
    val put = new Put(Bytes.toBytes(map.get("key").get)) //rowkey

    //车流量
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("car_num"), Bytes.toBytes(map.get("car_num").get))

    //时间
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("time"), Bytes.toBytes(map.get("time").get))

    //车道
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("lane_position"), Bytes.toBytes(map.get("lane_position").get))

    //平均速度(车道速度和/车道车辆数)(平均)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_speed"), Bytes.toBytes(map.get("now_avg_speed").get))

    //当前车道密度(1km/平均车间距)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_densit"), Bytes.toBytes(map.get("now_avg_densit").get))
  
    //拥堵度((1-(平均速度/最高限速))*10)
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("now_avg_TPI"), Bytes.toBytes(map.get("now_avg_TPI").get))

    //摄像头编号
    put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("cameraId"), Bytes.toBytes(map.get("cameraId").get))

    (new ImmutableBytesWritable, put)
  }
}

  

  • 案列四:

       使用SparkCore模块从Redis拉取数据经过逻辑处理数据后存入Redis

        

package com.lg.blgdata.core

import java.text.SimpleDateFormat
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import java.util.Calendar
import scala.collection.mutable
import java.util.Date
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import com.lg.blgdata.utils.JedisConnectionPool
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.DataFrame
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import org.apache.spark.sql.Row
import java.util.LinkedHashMap
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer
import java.lang.Long
import java.lang.Double
import redis.clients.jedis.Pipeline
import redis.clients.jedis.Jedis

/**
 *     定时任务执行
 * 1. 短时预测
 * 	从redis获取最新5分钟的数据合成:合成未来5个一分钟时间节点的流量
 */
object WriteRedis {
  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
  val fmtminute = new SimpleDateFormat("mm")
  val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH")
  val daysdf = new SimpleDateFormat("yyyy-MM-dd")
  val fmtScornd = new SimpleDateFormat("ss")

  def main(args: Array[String]): Unit = {

       val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WriteRedis")

							//创建spark上下文对象
							val sc = new SparkContext(config)

							//获取一个jedis连接池
							val jedis: Jedis = JedisConnectionPool.getConnections()
							jedis.select(3) //db,默认有16个

							val carRDD:List[String]=List("MV005","MV158")

							//分别拿到左右线的5分钟内车流量
              var rsListL = new ListBuffer[mutable.Map[String, String]]()
              var rsListR = new ListBuffer[mutable.Map[String, String]]()

							carRDD.foreach(carId⇒{
								    val result: java.util.Map[String,String] = jedis.hgetAll(carId)
										val keys=result.keySet().toArray()
										for(key <- keys){
										  val thisDate=format.parse(key.toString()+":00").getTime().longValue()
											val startDate=getTime.get("sdate").get
											val endDate=getTime.get("edate").get
							
										  if(startDate<=thisDate &&thisDate<endDate){//时间判断
  										     if(carId.replace("M", "").equals("V005")){//左右线筛选
  										       	 var DataPL: mutable.Map[String, String] = mutable.Map()
    										       	   DataPL("key")=key.toString()
    										       	   DataPL("sum")=key.toString()
    										       	   DataPL("num")=result.get(key).toString()
    										       	   rsListL.append(DataPL)
  										     }
  										     
  										     if(carId.replace("M", "").equals("V158")){//左右线筛选
  										    	  var DataPR: mutable.Map[String, String] = mutable.Map()
  										    	      DataPR("key")=key.toString()
  										    	      DataPR("sum")=key.toString()
  										       	    DataPR("num")=result.get(key).toString()
  										       	    rsListR.append(DataPR)
  										     }
										  }
										}
							})
							
							 //时间升序
              var dataL=rsListL.sortWith{
                  case (a,b)=>{
                     Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())<
                     Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime())
                  }
              }
       
							var dataR=rsListR.sortWith{
    							case (a,b)=>{
    								Long.valueOf(format.parse(a.get("key").get.toString()+":00").getTime())<
    								Long.valueOf(format.parse(b.get("key").get.toString()+":00").getTime())
    							}
							}
					
						
							var iL=1
							var keyLS=0     //建的和
							var valueLS=0   //值的和
							var sumLS=0     //权重值的和
							dataL.foreach{x⇒{
							  x("key")=(iL).toString()
							  x("sum")=(iL*Integer.valueOf(x.get("num").get)).toString()
							   keyLS+=iL
							  iL=iL+1
							  valueLS+=Integer.valueOf(x.get("num").get)
							  sumLS+=Integer.valueOf(x.get("sum").get)
							}}
							var L:Array[Int]=Array(keyLS,valueLS,sumLS)
							
							val bL:String=Double.valueOf((5*L(2)-L(0)*L(1))/(50.0)).formatted("%.2f")//求b的值
							val yL:String=Double.valueOf(valueLS/5.0).formatted("%.2f")              //求y的值
							val aL=(Double.valueOf(yL)-3*Double.valueOf(bL)).formatted("%.2f")       //求a的值
							
							//以上求出A和B  带入公式S=a+bx,x=7,8,9,10,11
					     val st:List[Int]=List(7,8,9,10,11)
					     
					     var sListL = new ListBuffer[mutable.Map[String, String]]()
					     var siL=1
					     st.foreach(x⇒{
					       var rsL: mutable.Map[String, String] = mutable.Map()
					       rsL("sum")=Math.abs(scala.math.round(Double.valueOf(aL)+(Double.valueOf(bL)*x))).toString()
					       rsL("time")=format.format(getTime.get("rowkey_"+siL).get)
					       siL=siL+1
					       sListL.append(rsL)
					     })
					
					     
							//<<<<<<<<<<<<<<<<<<<<<<<<>>>>>>>>>>>>>>>>>>>>>>>> 
							var iR=1
							var keyRS=0     //建的和
							var valueRS=0   //值的和
							var sumRS=0     //权重值的和
							dataR.foreach{x⇒{
								x("key")=(iR).toString()
								x("sum")=(iR*Integer.valueOf(x.get("num").get)).toString()
								keyRS+=iR
								iR=iR+1
								valueRS+=Integer.valueOf(x.get("num").get)
							  sumRS+=Integer.valueOf(x.get("sum").get)
							}}
						  var R:Array[Int]=Array(keyRS,valueRS,sumRS)
						
  					   val bR:String=Double.valueOf((5*R(2)-R(0)*R(1))/(50.0)).formatted("%.2f") 
  					   val yR:String=Double.valueOf(valueRS/5.0).formatted("%.2f")                
  					   val aR=(Double.valueOf(yR)-3*Double.valueOf(bR)).formatted("%.2f")                             //求a的值
					
				       var sListR = new ListBuffer[mutable.Map[String, String]]()
					     var siR=1
					     st.foreach(x⇒{
					       var rsR: mutable.Map[String, String] = mutable.Map()
					       rsR("sum")=Math.abs(scala.math.round((Double.valueOf(aR)+Double.valueOf(bR)*x))).toString()
					       rsR("time")=format.format(getTime.get("rowkey_"+siR).get)
					       siR=siR+1
					       sListR.append(rsR)
					     })
					 
					     
				//开启redis的(pipeline)事务
				var pipeline: Pipeline = null
				  try {   
				    
						//开启pipeline
						pipeline=jedis.pipelined()
						pipeline.multi()
					   //循环数据
					   sListL.foreach(x⇒{
					     pipeline.hset("predV005", x.get("time").get, x.get("sum").get)
					   })
					   sListR.foreach(y⇒{
					     pipeline.hset("predV158", y.get("time").get, y.get("sum").get)
					   })

					  //提交事务
						pipeline.sync()
						pipeline.exec()
						
					} catch {
											case e: Exception => {
												e.printStackTrace()
												pipeline.discard()//放弃前面的操作
												sc.stop()
											}
					}finally{
												if(pipeline!=null){
													pipeline.close()
												}
												if(jedis!=null){
													jedis.close()
												}
			 }
					
    //关闭
    sc.stop()

  }
  def handlerMessageRow(jsonStr: String): Row = {
    import scala.collection.JavaConverters._
    val array = JSON.parseObject(jsonStr, classOf[LinkedHashMap[String, Object]]).asScala.values.map(x => String.valueOf(x)).toArray
    Row(array: _*)
  }

  //计算出需要拿取的数据时间节点及预测的时间点
  def getTime(): mutable.Map[String, Long] = {
    //计算出最新的5分钟时间节点
    val date: Calendar = Calendar.getInstance()
    date.add(Calendar.MINUTE, +1)
    val rowkey_1 = sdf.format(date.getTime()) + ":00"

    var map: mutable.Map[String, Long] = mutable.Map()
    //预测的时间点
    map("rowkey_1") = format.parse(rowkey_1).getTime().longValue() //thisTime加1分钟

    date.add(Calendar.MINUTE, +1)
    val rowkey_2 = sdf.format(date.getTime()) + ":00"
    map("rowkey_2") = format.parse(rowkey_2).getTime().longValue()

    date.add(Calendar.MINUTE, +1)
    val rowkey_3 = sdf.format(date.getTime()) + ":00"
    map("rowkey_3") = format.parse(rowkey_3).getTime().longValue()

    date.add(Calendar.MINUTE, +1)
    val rowkey_4 = sdf.format(date.getTime()) + ":00"
    map("rowkey_4") = format.parse(rowkey_4).getTime().longValue()

    date.add(Calendar.MINUTE, +1)
    val rowkey_5 = sdf.format(date.getTime()) + ":00"
    map("rowkey_5") = format.parse(rowkey_5).getTime().longValue()

    val sdate: Calendar = Calendar.getInstance()
    val sd = sdf.format(sdate.getTime()) + ":00"
    map("edate") = format.parse(sd).getTime().longValue()

    sdate.add(Calendar.MINUTE, -(1 * 5))
    val ed = sdf.format(sdate.getTime()) + ":00"
    map("sdate") = format.parse(ed).getTime().longValue()

    (map)
  }

}

  

        

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/KdeS/p/14307197.html