SparkStreaming消费Kafka数据并计算后往Redis写数据案列

package com.lg.blgdata.streaming

import org.apache.spark.streaming.StreamingContext
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.storage.StorageLevel
import org.apache.kafka.common.serialization.StringDeserializer
import kafka.serializer.StringDecoder
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.PerPartitionConfig
import org.apache.spark.streaming.kafka010.PreferConsistent
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategy
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types.LongType
import java.util.Date
import scala.collection.mutable
import java.lang.Long
import org.apache.kafka.common.TopicPartition
import redis.clients.jedis.Jedis
import redis.clients.jedis.Pipeline
import com.lg.blgdata.utils.JedisConnectionPool
import com.lg.bigdata.utils.JZWUtil

/**
 * 1. 创建Driver 无状态
 * 	kafka给redis推送实时5分钟/流量,1天/流量
 */
object KafkaRedis {
	val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
			val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm")
			val hourSdf = new SimpleDateFormat("yyyy-MM-dd HH")
			val daysdf = new SimpleDateFormat("yyyy-MM-dd")
			val fmtScornd = new SimpleDateFormat("ss")

			def main(args: Array[String]): Unit = {
					val groupId = "jwz"

							//1.创建SparkConf并初始化SSC
							val sparkConf = new SparkConf().setMaster("local[*]").setAppName("CarCount")
							val ssc = new StreamingContext(sparkConf, Seconds(1))
							ssc.sparkContext.setLogLevel("WARN")

							/*2.定义kafka参数将kafka参数映射为map
							 * earliest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
							 * latest  当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
							 * none  topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
							 */

							val kafkaParams = Map[String, Object](
									"bootstrap.servers" -> "hadoop104:9092", //kafka链接地址
									"key.deserializer" -> classOf[StringDeserializer], //序列化
									"value.deserializer" -> classOf[StringDeserializer], //反序列化
									"group.id" -> groupId, //主题
									"auto.offset.reset" -> "latest", //earliest latest
									"enable.auto.commit" -> (true: java.lang.Boolean) //是否让消费者自己提交偏移量
									)

							val topics = Array("car")

							//3.通过KafkaUtil创建kafkaDSteam
							//官方推荐的直连方式,使用kafka底层的API,效率更高
							val kafkaDSteam = KafkaUtils.createDirectStream(
									ssc,
									LocationStrategies.PreferConsistent,
									ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

							//数据类型
							val schema = StructType(List(
									StructField("cameraId", StringType),
									StructField("time", StringType),
									StructField("lane_position", StringType),
									StructField("carType", StringType),
									StructField("speed", StringType),
									StructField("space", StringType)))

							//4.yKey结果输出到redis
							var jedis: Jedis = null
							//开启redis的(pipeline)事务
							var pipeline: Pipeline = null
							
							var spark:SparkSession =null

							/**
							 * 将reduceB
							 * 处理JSON字符串为Row 生成RDD[Row] 然后通过schema创建DataFrame
							 * 左线 :V158
							 * 右线 :V005
							 */
							kafkaDSteam.map(record => JZWUtil.handlerMessage2Row(record.value())).foreachRDD(rdd => {
								if (!rdd.isEmpty()) { //数据不为空
    								  if(spark==null){
    								    spark= SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
    								  }
										  val df:DataFrame = spark.createDataFrame(rdd, schema)
											val map=getTime
											//主线左时间节点和点位筛选
											val dfV158=df.filter(" cameraId =='V158' and time >"+map.get("sdate").get).toDF()
											//筛选两个列
											val countV158=dfV158.select("time","cameraId").count()

											//主线右时间节点和点位筛选
											val dfV005=df.filter(" cameraId =='V005' and time >"+map.get("sdate").get).toDF()
											//筛选两个列
											val countV005=dfV005.select("time","cameraId").count()
											
											//主线右时间节点和点位筛选
											val dfV024=df.filter(" cameraId =='V024' and time >"+map.get("sdate").get).toDF()
											//筛选两个列
											val countV024=dfV024.select("time","cameraId").count()

											try {

												   //获取一个jedis连接池
    											  if(jedis==null){
    											    	jedis=JedisConnectionPool.getConnections()
    											  }
														jedis.select(3)//3号db,默认有16个

														//开启pipeline
														pipeline=jedis.pipelined()
														//开启多操作模式
														pipeline.multi()

														//写入计算好的结果
														
														  /*
															*  pipeline.hset(x$1, x$2, x$3)//覆盖
															* 	大key  小key  值
															* 	有则累加,无则新增
															*/
															//5s实时
															pipeline.hincrBy("SV158", format.format(map.get("edate").get),countV158)

															//分钟实时
															pipeline.hincrBy("MV158", sdf.format(map.get("edate").get),countV158)

															//小时实时
															pipeline.hincrBy("HV158", hourSdf.format(map.get("edate").get),countV158)

															//天实时
															pipeline.hincrBy("DV158", daysdf.format(map.get("edate").get),countV158)

															//全线
															pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV158)
															
    											  //V005
    													pipeline.hincrBy("SV005",format.format(map.get("edate").get), countV005)
    													pipeline.hincrBy("MV005",sdf.format(map.get("edate").get),countV005)
    													pipeline.hincrBy("HV005",hourSdf.format(map.get("edate").get),countV005)
    													pipeline.hincrBy("DV005",daysdf.format(map.get("edate").get), countV005)
    
    													//全线
    													pipeline.hincrBy("allM", sdf.format(map.get("edate").get),countV005)
    											
    													//V024
															pipeline.hincrBy("HV024", hourSdf.format(map.get("edate").get),countV024)
															pipeline.hincrBy("DV024", daysdf.format(map.get("edate").get),countV024)
    													
												//提交事务
												pipeline.sync()
												pipeline.exec()

											} catch {
											case e: Exception => {
												e.printStackTrace()
												pipeline.discard()//放弃前面的操作
												ssc.stop(true)//优雅关闭
											}
											}finally{
												if(pipeline!=null){
													pipeline.close()
												}
												if(jedis!=null){
													jedis.close()
												}
											}
								}
							})
							//启动采集器
							ssc.start()

							//Driver等待采集器的执行,采集器终止,Driver也会终止
							ssc.awaitTermination()
	}
	def getTime(): mutable.Map[String, Long] = {
			//计算出最新的5秒钟时间节点
			val date: Calendar = Calendar.getInstance()
					val indexMinute = format.format(date.getTime())
					var dt: String = null
					val scornd = fmtScornd.format(date.getTime)
					if (Integer.valueOf(scornd) % 5 != 0) {
						val rs: Int = Integer.valueOf(scornd) / 5
								val min = (rs * 5 + 5).toString()
								val builderDate = new StringBuilder(indexMinute).replace(17, 19, min)
								dt = builderDate.toString()
					} else {
						dt = indexMinute
					}

					//算出上一个5秒钟节点的结束时间
					val time: Date = format.parse(dt.toString())
							val sdate: Calendar = Calendar.getInstance()
							sdate.setTime(time)
							sdate.add(Calendar.SECOND, -5)

							var map: mutable.Map[String, Long] = mutable.Map()
							map("sdate") = sdate.getTimeInMillis.toLong //时间戳,用于做时间比对
							map("edate") = format.parse(dt).getTime().longValue() //存入redis的是格式化的时间
							(map)
	}

}

  

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文地址:https://www.cnblogs.com/KdeS/p/14307084.html