spark streaming拉取kafka数据, 结合sparkSql dataframe hive存储计算,输出到mysql

spark streaming拉取kafka数据, 结合sparkSql dataframe hive存储计算,输出到mysql.

数据清洗过程比较复杂,没办法,上游给的屡一样的数据,正则去解析并全量按时间取最新一条去重。

每天kafka数据5千万条。1分钟要刷一次,看上去还可以满足。只有屡一样去堆代码了。

package biReportJob.streaming

import java.io.InputStreamReader
import java.sql.DriverManager
import java.text.SimpleDateFormat
import java.util.{Calendar, Locale}

import common.getEnv
import io.thekraken.grok.api.Grok
import org.apache.commons.lang3.StringUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import scripts.mongoEtls_zt.ZtLxjETLDailyPushOrder.dataExtractRegexStr
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import scala.util.parsing.json.JSON

/**
* create by roy 2019-08-15
* spark streaming 消费kafka
* 订单全量去重,取最新一条数据
*biReportJob.streaming.LxjOrderStreaming
*/
object LxjOrderStreaming extends Logging {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.streaming.kafka010.DirectKafkaInputDStream").setLevel(Level.WARN)
Logger.getRootLogger.setLevel(Level.WARN)

def main(args: Array[String]): Unit = {
val Array(day_hh, streamingSecondsa) = (args ++ Array(null, null)).slice(0, 2)
var bootstrap_servers = "192.168.7.14:9092,192.168.7.14:9093,192.168.7.15:9092"
if ("local".equals(getEnv("env"))) {
bootstrap_servers = "10.200.102.84:9092"
}
val spark = SparkSession
.builder()
.appName("kafka2HiveStreaming")
.enableHiveSupport()
.getOrCreate()
var streamingSeconds = 60
if (StringUtils.isNotBlank(streamingSecondsa)) {
streamingSeconds = streamingSecondsa.toInt
}
val sparkContext: SparkContext = spark.sparkContext
val ssc: StreamingContext = new StreamingContext(sparkContext, Seconds(streamingSeconds))
val topics = Array("zt_log")
val table_after = getEnv("env")
/**
* enable.auto.commit 数据处理之后提交,如果设为true的话,那么意味着offsets会按照auto.commit.interval.ms中所配置的间隔来周期性自动提交到Kafka中
*
* earliest 那么任务会从最开始的offset读取数据,相当于重播所有数据。这样的设置会使得你的任务重启时将该topic中仍然存在的数据再读取一遍。这将由你的消息保存周期来决定你是否会重复消费。
* latest 那么你的应用启动时会从最新的offset开始读取,这将导致你丢失数据。这将依赖于你的应用对数据的严格性和语义需求,这或许是个可行的方案。
*/
var kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrap_servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1_post_push_reduce_local",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
if ("999".equals(day_hh)) {
kafkaParams = Map[String, Object](
"bootstrap.servers" -> bootstrap_servers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "g1_post_push_reduce_local",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean))
}

val dateFormate: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
val cal = Calendar.getInstance
val c_yymmdd = dateFormate.format(cal.getTime)
println("env=" + getEnv("env") + " day_hh=" + day_hh + " cal.getTime.getTime=" + cal.getTime.getTime + " c_yymmdd=" + c_yymmdd + " bootstrap_servers=" + bootstrap_servers)
val messagesDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
// val lines = messages.map(_.value)
messagesDStream.foreachRDD(rdd => {
// 获取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
import spark.implicits._
val lines = rdd.map(_.value)
val wordsDataFrame1 = lines.map(line => {
val max_time: Long = Calendar.getInstance.getTime.getTime / 1000
var jsonstr = ""
try {
val grok = SparkSessionSingleton.getGrokInstance()
val m = grok.`match`(line)
m.captures
jsonstr = m.toJson()
} catch {
case e: java.lang.Exception => // Exception
println("Exception" + e)
}
var rtLine = "msg_null"
if (StringUtils.isNotBlank(jsonstr)) {
// println(" m.toJson()=" + jsonstr)
val rooMap = JSON.parseFull(jsonstr).get.asInstanceOf[Map[String, Any]]
if (rooMap.contains("flag") && rooMap.contains("msg") && rooMap.contains("actionName") &&
rooMap.getOrElse("flag", 0).asInstanceOf[Double].toInt == 1) {
val msg = rooMap.getOrElse("msg", "").toString
val actionName = rooMap.getOrElse("actionName", "").toString
if (StringUtils.isNotBlank(msg) && StringUtils.isNotBlank(actionName)) {
if ("candao.order.postOrder".equals(actionName) || "candao.order.pushOrder".equals(actionName)) {
val jsonMsg = "{" + msg.replaceFirst(dataExtractRegexStr, "")
val jsonMap = JSON.parseFull(jsonMsg).get.asInstanceOf[Map[String, Any]]
// println("jsonMap==" + jsonMap)
if (jsonMap.contains("data") && jsonMap.contains("data") != None && jsonMap.get("data").get.isInstanceOf[Map[String, Any]]) {
val msgData = jsonMap.get("data").get.asInstanceOf[Map[String, Any]]
val actionName = jsonMap.getOrElse("actionName", "")
var orderMoney = 0.0
var orderNo = ""
var otype = -1
var orderId = ""
var orderDate = "20180102"
if (actionName.equals("candao.order.postOrder")) {
orderMoney = msgData.getOrElse("merchantPrice", 0.0).asInstanceOf[Double]
otype = 1
//extid,补上orderId,做统一去重用
orderId = "1" + msgData.getOrElse("extId", "").toString
// orderDate = msgData.getOrElse("orderDate", "20180101").toString
orderDate = msgData.getOrElse("createTime", "20180101").toString.substring(0, 8)
} else if (actionName.equals("candao.order.pushOrder")) {
orderMoney = msgData.getOrElse("price", 0.0).asInstanceOf[Double] //price
orderNo = msgData.getOrElse("orderNo", "").toString //price
orderId = "2" + msgData.getOrElse("orderId", "").asInstanceOf[Double].toInt.toString
orderDate = msgData.getOrElse("orderDate", "20180101").toString.substring(0, 10).replace("-", "")
otype = 2
}
val extId = msgData.getOrElse("extId", "").toString
val extOrderId = msgData.getOrElse("extOrderId", "").toString
val extStoreId = msgData.getOrElse("extStoreId", "").toString
val stype = msgData.getOrElse("type", -899.0).asInstanceOf[Double].toInt
val payType = msgData.getOrElse("payType", -899.0).asInstanceOf[Double].toInt
val orderStatus = msgData.getOrElse("orderStatus", -899.0).asInstanceOf[Double].toInt
val createTime = msgData.getOrElse("createTime", 899).toString.toLong
println("ok data extId=" + extId)
rtLine = extId + "," + extOrderId + "," + extStoreId + "," + stype + "," + payType + "," + orderMoney + "," + orderStatus + "," + orderDate + "," + createTime + "," + otype + "," + orderNo + "," + orderId + "," + max_time
}
}
}
}
}
rtLine
}).filter(row => row != null && !row.equals("msg_null"))
.map(w => {
val row_data = w.split(",")
StreamingStoreEntity(row_data(0), row_data(1), row_data(2), row_data(3).toInt, row_data(4).toInt, row_data(5).toDouble, row_data(6).toInt, row_data(7), row_data(8).toLong, row_data(9).toInt, row_data(10), row_data(11), row_data(12).toLong)
}).toDS()
wordsDataFrame1.cache()
println("wordsDataFrame1 show")
wordsDataFrame1.show()
if (!wordsDataFrame1.take(1).isEmpty) {
wordsDataFrame1.sqlContext.setConf("hive.exec.dynamic.partition", "true")
wordsDataFrame1.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
if (!tableExists(s"zt.lxj_streaming_order_data_$table_after", wordsDataFrame1.sparkSession)) {
println("table is not create table lxj_streaming_order_data_")
wordsDataFrame1.withColumn("part_job", lit(c_yymmdd)).write.mode(SaveMode.Overwrite).partitionBy("part_job").saveAsTable(s"zt.lxj_streaming_order_data_$table_after")
} else {
wordsDataFrame1.withColumn("part_job", lit(c_yymmdd)).write.mode(SaveMode.Append).partitionBy("part_job").saveAsTable(s"zt.lxj_streaming_order_data_$table_after")
}
val yymmdd = c_yymmdd.replace("-", "")
val DF1 = spark.sql(
s"""
|select t1.* from (
|select *, row_number() over(partition by orderId ORDER BY createTime desc) rn
|from zt.lxj_streaming_order_data_$table_after where part_job='$c_yymmdd' and orderDate='$yymmdd'
|) t1 where t1.rn=1
""".stripMargin)
DF1.createOrReplaceTempView("table_tmp_1")
val sql2 =
s"""
|select orderDate,extStoreId,otype,max(jobtime) jobtime,count(1) orderNum,round(sum(orderMoney),4) orderMoney from table_tmp_1
|group by orderDate,extStoreId,otype
""".stripMargin
val DF2 = spark.sql(sql2)
DF2.createOrReplaceTempView("result_left_city_tmp")
spark.sqlContext.setConf("spark.sql.crossJoin.enabled", "true")
val resultCityDF = spark.sql(
"""
|select t1.*,t2.provinceid,t2.provincename,t2.cityid,t2.cityname,t2.storename from result_left_city_tmp t1
|left join zt.lxj_store t2
|on t1.extStoreId=t2.extstoreid
""".stripMargin)
resultCityDF.cache()
// resultCityDF.checkpoint()
if (!resultCityDF.take(1).isEmpty) {
savedbresult(resultCityDF)
} else {
println("update time")
updateTime()
}
} else {
println("update time")
updateTime()
}
// some time later, after outputs have completed
messagesDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
// println(s"========= $time =========")
})


// wordsDStream.print()
// Start the computation
ssc.start()
ssc.awaitTermination()
}


def savedbresult(dataDF: DataFrame): Unit = {
val (user, passwd, url) = common.LocalMysqlSettings("finereport.user", "finereport.passwd", "finereport.url", "finereportmysql.properties")
println("mysql url==" + url)
val insertSql =
"""
|INSERT INTO lxj_order_theday_board_streaming_test (`provinceid`, `provincename`, `cityid`, `cityname`,`orderDate`, extStoreId,`orderNum`, `orderMoney`, `dt_time`,otype,storename)
| VALUES (?,?,?,?,?,?,?,?,?,?,?)
""".stripMargin
val conn = DriverManager.getConnection(url, user, passwd)
conn.setAutoCommit(false)
try {
val truncatesql = conn.prepareStatement("delete from lxj_order_theday_board_streaming_test")
// val truncatesql = conn.prepareStatement("TRUNCATE lxj_order_theday_board")
truncatesql.execute()
val dataps = conn.prepareStatement(insertSql)
val list = dataDF.rdd.collect().toList
// println("list prin=" + list)
list.foreach(unit => {
dataps.setInt(1, unit.getAs[Int]("provinceid"))
dataps.setString(2, unit.getAs[String]("provincename"))
dataps.setInt(3, unit.getAs[Int]("cityid"))
dataps.setString(4, unit.getAs[String]("cityname"))
dataps.setString(5, unit.getAs[String]("orderDate"))
dataps.setString(6, unit.getAs[String]("extStoreId"))
dataps.setLong(7, unit.getAs[Long]("orderNum"))
dataps.setDouble(8, unit.getAs[Double]("orderMoney"))
dataps.setInt(9, unit.getAs[Long]("jobtime").toInt)
dataps.setInt(10, unit.getAs[Int]("otype"))
dataps.setString(11, unit.getAs[String]("storename"))
dataps.addBatch()
})
dataps.executeBatch()
conn.commit()
} catch {
case e: Exception => {
e.printStackTrace()
println(e)
conn.rollback()
throw new Exception("mysql error =" + e.getMessage)
}
} finally {
conn.close()
}
}

def updateTime(): Unit = {
val (user, passwd, url) = common.LocalMysqlSettings("finereport.user", "finereport.passwd", "finereport.url", "finereportmysql.properties")
val conn = DriverManager.getConnection(url, user, passwd)
try {
val truncatesql = conn.prepareStatement("update lxj_order_theday_board_streaming_test set db_uptime=now()")
truncatesql.execute()
} catch {
case e: Exception => {
println(e)
throw new Exception("mysql error =" + e.getMessage)
}
} finally {
conn.close()
}
}

def tableExists(table: String, spark: SparkSession) =
spark.catalog.tableExists(table)
}

/** Case class for converting RDD to DataFrame */
case class Record(word: String)

case class StreamingStoreEntity(extId: String, extOrderId: String, extStoreId: String, stype: Int, payType: Int, orderMoney: Double, orderStatus: Int, orderDate: String, createTime: Long, otype: Int, orderNo: String, orderId: String, jobTime: Long)

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _

@transient private var grok: Grok = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
// val conf = new SparkConf().setMaster("yarn").setAppName("LxjOrderStreaming")
instance = SparkSession
.builder
.enableHiveSupport()
.master("yarn")
.appName("SparkSessionSingleton")
.config(sparkConf)
// .config("spark.files.openCostInBytes", PropertyUtil.getInstance().getProperty("spark.files.openCostInBytes"))
// .config("hive.metastore.uris","thrift://namenode01.cd:9083")連接到hive元數據庫 --files hdfs:///user/processuser/hive-site.xml 集群上運行需要指定hive-site.xml的位置
// .config("spark.sql.warehouse.dir","hdfs://namenode01.cd:8020/user/hive/warehouse")
.getOrCreate()
}
instance
}

val pattern: String = "\[(?<createTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\] \[%{DATA:level}\] \[%{DATA:className}\] \[%{DATA:methodName}\] \[%{DATA:thread}\] \[%{GREEDYDATA:msg}\] \[%{NUMBER:clientType:int}\] \[%{NUMBER:step:int}\] \[%{NUMBER:flag:int}\] \[%{DATA:ip}\] \[%{DATA:clientIp}\] \[%{NUMBER:costTime:int}\] \[%{DATA:isErr}\] \[%{DATA:errName}\] \[%{DATA:logId}\] \[%{DATA:sysName}\] \[%{DATA:actionName}\] \[%{DATA:apiName}\] \[%{DATA:platformKey}\]"

def getGrokInstance(): Grok = {
if (grok == null) {
grok = new Grok()
val inputStream = this.getClass.getClassLoader.getResourceAsStream("patterns.txt")
grok.addPatternFromReader(new InputStreamReader(inputStream))
grok.compile(pattern)
}
grok
}
}


package biReportJob.streaming

import java.text.SimpleDateFormat
import java.util.{Calendar, Locale}

import common.getEnv
import org.apache.spark.sql.SparkSession

object CleamDataJob {

def main(args: Array[String]): Unit = {

val sparkBuilder = SparkSession.builder()
val cal = Calendar.getInstance()
cal.add(Calendar.DATE, -3)
if ("local".equals(getEnv("env"))) {
sparkBuilder.master("local[*]").config("hive.metastore.uris", "thrift://hdp02:9083")
}
val spark = sparkBuilder
.appName("CleamDataJob")
.enableHiveSupport()
.getOrCreate()
val dateFormate: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.US)
val c_yymmdd = dateFormate.format(cal.getTime)
val table_after = getEnv("env")
println("env=" + getEnv("env") + " c_yymmdd=" + c_yymmdd)
spark.sql(s"ALTER TABLE zt.lxj_streaming_order_data_$table_after DROP IF EXISTS PARTITION(part_job='$c_yymmdd')")


}

//查看原数据
//SELECT sum(orderMoney) from(
// select t1.* from (
// select *, row_number() over(partition by orderId ORDER BY createTime desc) rn
// from zt.lxj_streaming_order_data_stag where part_job='2019-08-16' and orderDate='20190816'
// ) t1 where t1.rn=1
//) as tt1
//
//SELECT sum(orderMoney) FROM zt.lxj_streaming_order_data_stag where orderdate='20190816' and part_job='2019-08-16'


}

————————————————
版权声明:本文为CSDN博主「java的爪哇」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/liangrui1988/article/details/99647065

原文地址:https://www.cnblogs.com/javalinux/p/15066502.html