Spark Sreaming与MLlib机器学习


背景:机器学习的模型可以部署到spark streaming应用上,比如接入kafka数据源。
以下为本人的初步解决思路,欢迎专业人士批评指正。
 
import java.util
import java.util.Properties

import mlaas.spark.listener.utils.JSONUtil
import mlaas.spark.main.SparkJob
import mlaas.spark.main.utils.JsonUtils
import kafka.javaapi.producer.Producer
import kafka.producer.{KeyedMessage, ProducerConfig}
import kafka.serializer.StringDecoder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.io.Source
import scala.xml.{Elem, XML}

/**
* Created by 15020218 on 2017/1/17.
* 机器学习的模型可以部署到spark streaming应用上,比如接入kafka数据源。
* 处理逻辑如下:
* 1.kafka数据流对应的批数据转换为dataframe。
* 2.模型对dataframe预测,生成新的dataframe。
* 3.最终的dataframe通过producer输出到kafka。
* 亮点:
* 1.完整的机器学习实验流程可在dataframe到dataframe的转换过程中完成。
* 2.支持checkpoint功能。
* 3.支持流控
*
*初版备注:
* 1.目前仅支持单入单出kafka数据源。
* 2.输入(实验的配置sample.xml, 输入源对应的schema,输入数据的分隔符,输入输出kafka的topic&brokerLst)
*
* 提交格式如下:
* source change_spark_version spark-2.1.0.2;export HADOOP_USER_NAME=mlp; /home/bigdata/software/spark-2.0.2.3-bin-2.4.0.10/bin/spark-submit --master yarn-cluster --driver-memory 4g --num-executors 4 --executor-memory 4g --conf spark.yarn.maxAppAttempts=1 --jars /home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar --class com.suning.mlaas.spark.StreamingExecutePref --jars ssmpConfigration.json,expConfigration.xml mlaas-mllib.jar
*/
object StreamingExecutePref extends Logging {

var reader_topic = ""
var reader_brokerLst = ""
var writer_topic = ""
var writer_brokerLst = ""
var tableName = ""
var jsonStr = ""
//模型拷贝临时目录使用
var ssmpJobId = ""
def main(args: Array[String]) {
val argfile = Source.fromFile("ssmpConfigration.json","UTF-8") //中文乱码
val jsonArgStr = argfile.mkString
log.info("argfile is " + jsonArgStr)
val jsonArg = JsonUtils.parseObject(jsonArgStr)
val batchTime = Integer.parseInt(jsonArg.getString("batchTime"))
ssmpJobId = jsonArg.getString("id")
val mlData = jsonArg.getJSONArray("mlData")
if(mlData.size() != 2){
log.error("数据源暂时仅支持一读一写")
System.exit(1)
}

val char_Sep = ","
val dataSource1 = mlData.getJSONObject(0)
val dataSource2 = mlData.getJSONObject(1)
List(dataSource1,dataSource2).map(ds => {
if("IN".equalsIgnoreCase(ds.getString("dsType"))){
reader_topic = ds.getString("topic")
reader_brokerLst = ds.getString("brokerList")
tableName = ds.getString("dsName")
log.info("reader_topic is " + reader_topic )
log.info("reader_brokerLst is " + reader_brokerLst )

if(tableName.indexOf(".") != -1){
tableName = tableName.substring(tableName.indexOf(".")+1)
}
log.info("tableName is " + tableName )
jsonStr = ds.getJSONArray("dsSchema").toString
log.info("jsonStr is " + jsonStr )
}else {
writer_topic = ds.getString("topic")
writer_brokerLst = ds.getString("brokerList")
log.info("writer_topic is " + writer_topic )
log.info("writer_brokerLst is " + writer_brokerLst )
}
})

val checkPointDir = s"/user/mlp/mlaas/ssmpcheckpoint/$ssmpJobId/"
val jobXml = XML.loadFile("expConfigration.xml")
/**
* 1.创建StreamingContext
*/
// @transient
val ssc = StreamingContext.getActiveOrCreate(checkPointDir,
() => {
createSsc(checkPointDir, batchTime, jobXml, char_Sep)
}
)
ssc.start()
ssc.awaitTermination()
}

//sep:输入分隔符,如逗号, 目前支持1读1写
def kafkaTopic2tmptable(jobXml: Elem, sep: String, srcTableName: String, jsonStr: String, topics: String, brokerLst: String, writeKafkaTopic: String, writeKafkaBrokerLst: String, ssc: StreamingContext) = {
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerLst)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

val orderedType = Map(
"boolean" -> BooleanType,
"byte" -> ByteType,
"short" -> ShortType,
"int" -> IntegerType,
"long" -> LongType,
"float" -> FloatType,
"double" -> DoubleType,
// "decimal" -> DecimalType,
"time" -> TimestampType,
"date" -> DateType,
"string" -> StringType,
"binary" -> BinaryType
)

val colArr = JsonUtils.parseArray(jsonStr)
var ii = 0
val fields = new util.ArrayList[StructField]()
val smap = scala.collection.mutable.HashMap[Int, DataType]()
while (ii < colArr.size()) {
val obj = colArr.getJSONObject(ii)
smap += (ii -> orderedType.getOrElse(obj.getString("columnType"), StringType))
val sf = StructField(obj.getString("columnName"), orderedType.getOrElse(obj.getString("columnType"), StringType), nullable = true)
fields.add(sf)
ii = ii + 1
}
val schema = StructType(fields)
log.info("******************************************************************************")
log.info("数据源schema: " + schema.mkString(","))
val rowRDD = messages.map(_._2).map(attr => {
(attr.toString.split(sep))
})
log.info("******************************************************************************")
log.info("go to foreachRDD: " + schema.mkString(","))
rowRDD.foreachRDD(rdd => {
// @transient
val sqlContext = new SQLContext(rdd.sparkContext)
// @transient
val hiveContext = new HiveContext(rdd.sparkContext)
if (rdd.count() > 0) {
// println(rdd.take(1)(0).mkString(":") + " ; count is :"+ rdd.count())
val rowRdd = rdd.map(Row(_)).map(row => row.getAs[Array[Any]](0))
.map(row => new GenericRow(row))
.map(genericToRow(_, schema, smap))
.map(row => row)
val df = sqlContext.createDataFrame(rowRdd, schema)
df.createOrReplaceTempView(srcTableName)
log.info("******************************************************************************")
log.info("table count is : " + sqlContext.table(srcTableName).count())
// println("table count is : " + sqlContext.table(srcTableName).count())
/**
* 3.类cbt离线任务的处理方式、处理转化后的dataframe
* 读数据源的节点、不做处理。
* 写数据源的节点、变更为写kafka数据流。
*/
val saveNodeSourceTable = processExpLogic(jobXml, hiveContext)
/**
* 4.对最终处理结束的dataframe、写入kafka数据源。 3和4步骤,放到DStream的foreachRDD逻辑中做处理。
*/
val targetdf = sqlContext.table(saveNodeSourceTable)
targetdf.foreachPartition(it => writeKafka(it, sep, srcTableName, writeKafkaTopic, writeKafkaBrokerLst))
}
})
}

def getValue(value: String, dataType: DataType): Any = {
if (IntegerType == dataType) {
Integer.parseInt(value)
} else if (DoubleType == dataType) {
java.lang.Double.parseDouble(value)
} else if (LongType == dataType) {
java.lang.Long.parseLong(value)
} else {
value
}
}

def genericToRow(row: GenericRow, schema: StructType, smap: scala.collection.mutable.HashMap[Int, DataType]): Row = {
val cols: Seq[Any] = (0 to smap.size - 1).map(x => getValue(row.getAs(x).toString, smap.get(x).get))
new GenericRowWithSchema(cols.toArray, schema)
}

/**
* 除了读数据源节点和写数据源节点的操作
*
* @param jobXml
* @param hiveContext
*/
def processExpLogic(jobXml: Elem, hiveContext: HiveContext): String = {
var saveNodeSourceTable = ""
val list = (jobXml "job").map(n => {
(n "@CDATA", n.text);
n.text
})
val it = list.iterator
while (it.hasNext) {
val value = it.next().replace("\", "")
val json = JSONUtil.toJSONString(value)
val classStr = json.get("class").toString

//模型和写数据源节点的逻辑跳过
if (!classStr.contains("DealWithSave") && !classStr.contains("DealWithModel")) {
val argsStr = json.get("params").toString
/** *********************registerTempTable中不能是mlp.zpc 的样式的处理 begin **/
val params = JsonUtils.parseObject(argsStr)
val keySet = params.keySet()
val itk = keySet.iterator()
while (itk.hasNext) {
val key = itk.next().toString
if (key.startsWith("source") || key.startsWith("target_")) {
val value = params.get(key).toString
if (value.contains(".")) {
val newVal = value.substring(value.indexOf(".") + 1)
params.put(key, newVal)
}
}
}
/** *********************registerTempTable中不能是mlp.zpc 的样式的处理 end **/
val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
obj.runCbt(hiveContext, params)
} else if (classStr.contains("DealWithSave")) {
val argsStr = json.get("params").toString
/** *********************registerTempTable中不能是mlp.zpc 的样式的处理 begin **/
val params = JsonUtils.parseObject(argsStr)
val keySet = params.keySet()
val itk = keySet.iterator()
while (itk.hasNext) {
val key = itk.next().toString
if (key.startsWith("source_")) {
val value = params.get(key).toString
if (value.contains(".")) {
val newVal = value.substring(value.indexOf(".") + 1)
saveNodeSourceTable = newVal
//params.put(key, newVal)
}
}
}
}
else if (classStr.contains("DealWithModel")) {//模型节点执行,复制模型到执行目录
val argsStr = json.get("params").toString
/** *********************检查临时目录,拷贝模型文件,修改json中params的path属性begin **/
val params = JsonUtils.parseObject(argsStr)
val keySet = params.keySet()
val itk = keySet.iterator()
while (itk.hasNext) {
val key = itk.next().toString
if (key.startsWith("path") ) {
val value = params.get(key).toString
//原path形如:/user/mlp/mlaas/savedModel/transformers/experiment-2852/node-17144/Model-20170223202226
val newPath = value.replace("savedModel", "ssmpJob/" + ssmpJobId)
params.put(key, newPath)
log.info("******************************************************************************")
log.info("newPath : " + newPath + " oldPath : " + value)
val conf = new Configuration()
val fs = FileSystem.get(conf)
if(!fs.exists(new Path(newPath))){
val src = new Path(value)
val dst = new Path(newPath)
FileUtil.copy(fs, src, fs, dst, false, true, conf)
}
}
}
/** *********************检查临时目录,拷贝模型文件,修改json中params的path属性 end **/
val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
obj.runCbt(hiveContext, params)
}
}
saveNodeSourceTable
}

/**
* checkPoint功能的支持,注意点:
* 1.需要将实现的逻辑放入本方法体内。
* 2.SQLContext的创建,需要依赖于rdd.sprkContext,
* 不可依赖于传参过来的StreamingContext
* 如:val sqlContext = new SQLContext(rdd.sparkContext)
* @param checkPointDir
* @param sec
* @param jobXml
* @param char_sep
* @return
*/
def createSsc(checkPointDir: String, sec : Int, jobXml: Elem, char_sep: String): StreamingContext = {
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("StreamingExecutePref")
.set("spark.streaming.kafka.maxRatePerPartition", "10000")
val ssc = new StreamingContext(sparkConf, Seconds(sec))
// val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(sec))
/**
* 2.处理数据源,将kafka数据源数据rdd映射为tmp table,以便于其他类的处理
* 每个读写数据源包含以下元素(brokerLst、 topics、 colNameList(name,type)、 输入格式默认以逗号分隔)
* Create direct kafka stream with brokers and topics
*/
kafkaTopic2tmptable(jobXml, char_sep, tableName, jsonStr, reader_topic, reader_brokerLst, writer_topic, writer_brokerLst, ssc)

ssc.checkpoint(checkPointDir)
ssc
}

def writeKafka(it: Iterator[Row], sep: String, srcTableName: String, topic: String, brokerLst: String)
= {

val props = new Properties()
props.put("metadata.broker.list", brokerLst)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("request.required.acks", "1")
props.put("producer.type", "async")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while (it.hasNext) {
val row = it.next()
log.info("******************************************************************************")
log.info("topic : " + topic + " msg : " + row.mkString(sep))
producer.send(new KeyedMessage[String, String](topic, row.mkString(sep)))
}
}
}
 



sample.xml形如:
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<jobs>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithSelect","expId":3109,"libjars":"/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar","nodeId":18788,"params":{"expId":3109,"nodeId":18788,"partitioncheckbox":"0","source":"mlp.adult_income","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18788_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithModel","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18791,"params":{"expId":3109,"modelId":"564","nodeId":18791,"path":"/user/mlp/mlaas/savedModel/transformers/experiment-2852/node-17144/Model-20170223202226","targetType":[{"targetName":"target_1","targetType":"model"}],"target_1":"/user/mlp/mlaas/transformers/experiment-3109/node-18791/","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithSort","expId":3109,"nodeId":18789,"params":{"expId":3109,"k":"1000","nodeId":18789,"outputs":"age","sortOrder":"asc","source_1":"mlp.tmp_mlp_table_3109_18788_1","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18789_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithMutiMinMax","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18790,"params":{"expId":3109,"features":"fnlwgt,education_num,hours_per_week,capital_gain,age,capital_loss","keepOriginal":"1","nodeId":18790,"source_1":"mlp.tmp_mlp_table_3109_18789_1","targetType":[{"targetName":"target_1","targetType":"dataset"},{"targetName":"target_2","targetType":"transformer"}],"target_1":"mlp.tmp_mlp_table_3109_18790_1","target_2":"/user/mlp/mlaas/transformers/experiment-3109/node-18790/","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithApplyRegModel","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18792,"params":{"expId":3109,"nodeId":18792,"source_1":"/user/mlp/mlaas/transformers/experiment-3109/node-18791/","source_2":"mlp.tmp_mlp_table_3109_18790_1","targetColumn":"sex","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18792_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    <job><![CDATA[{"class":"mlaas.spark.main.DealWithSave","expId":3109,"libjars":"/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18793,"params":{"expId":3109,"nodeId":18793,"source_1":"mlp.tmp_mlp_table_3109_18792_1","target":"mlp.zpcstreamingresult","targetType":[],"user":"mlp","writeType":"create"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
</jobs>

 提交命令形如:

spark-2.0.2.3-bin-2.4.0.10/bin/spark-submit --master yarn-cluster --driver-memory 4g  --num-executors 4  --executor-memory 4g  --conf spark.yarn.maxAppAttempts=1 --jars /home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar --class mlaas.spark.StreamingExecutePref  --jars sample.xml mlaas-mllib.jar "{"batchTime":20,"mlData":[{"topic":"mlaas_event","brokerList":"10.27.189.238:9092,10.27.189.239:9092","dsName":"mlp.adult_income","dsSchema":[{"columnName":"age","columnType":"int","dataType":"none"},{"columnName":"workclass","columnType":"string","dataType":"none"},{"columnName":"fnlwgt","columnType":"int","dataType":"none"},{"columnName":"education","columnType":"string","dataType":"none"},{"columnName":"education_num","columnType":"int","dataType":"none"},{"columnName":"marital_status","columnType":"string","dataType":"none"},{"columnName":"occupation","columnType":"string","dataType":"none"},{"columnName":"relationship","columnType":"string","dataType":"none"},{"columnName":"race","columnType":"string","dataType":"none"},{"columnName":"sex","columnType":"string","dataType":"none"},{"columnName":"capital_gain","columnType":"int","dataType":"none"},{"columnName":"capital_loss","columnType":"int","dataType":"none"},{"columnName":"hours_per_week","columnType":"int","dataType":"none"},{"columnName":"native_country","columnType":"string","dataType":"none"},{"columnName":"income","columnType":"string","dataType":"none"}],"dsType":"IN","nodeInstanceId":18788},{"topic":"mlaas_writeTopic","brokerList":"10.27.189.238:9092,10.27.189.239:9092","dsName":"mlp.zpcstreamingresult","dsType":"OUT","nodeInstanceId":18793}]}"

测试辅助类:

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.io.Source
import scala.reflect.io.Path
/**
  * Created by zpc on 2017/2/23.
  */
object KafkaProducer {

  def main(args: Array[String]): Unit = {
    val BROKER_LIST = "10.27.189.238:9092,10.27.189.239:9092"
    val TARGET_TOPIC = "mlaas_event" //"new"
    val DIR = "/root/Documents/"

    /**
      * 1、配置属性
      * metadata.broker.list : kafka集群的broker,只需指定2个即可
      * serializer.class : 如何序列化发送消息
      * request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0
      * producer.type : 默认就是同步sync
      */
    val props = new Properties()
    props.put("metadata.broker.list", BROKER_LIST)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("request.required.acks", "1")
    props.put("producer.type", "async")

    /**
      * 2、创建Producer
      */
    val config = new ProducerConfig(props)
    val producer = new Producer[String, String](config)
    while(true){
      val line = "50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K"
      val message = new KeyedMessage[String, String](TARGET_TOPIC, line)
      println(line)
      producer.send(message)
      Thread.sleep(2000)
    }

  }

}



import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by zpc on 2017/2/23.
*/
object KafkaReader {

def main(args: Array[String]): Unit = {
val brokers = "10.27.189.238:9092,10.27.189.239:9092"
// val topics = "mlaas_event"
val topics = "mlaas_writeTopic"

val sparkconf = new SparkConf().setAppName("kafkastreaming").setMaster("local[2]")
val ssc = new StreamingContext(sparkconf,Seconds(20))

// ssc.checkpoint("w_checkpoints") //windows 路径

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)


//{"@timestamp":"2016-12-14T16:26:21.746Z","beat":{"hostname":"root","name":"root","version":"5.1.1"},"metricset":{"module":"system","name":"process","rtt":28025},"system":{"process":{"cmdline":""C:\WINDOWS\system32\SearchFilterHost.exe" 0 624 628 644 8192 632 ","cpu":{"start_time":"2016-12-14T16:24:15.240Z","total":{"pct":0.000000}},"memory":{"rss":{"bytes":7495680,"pct":0.000400},"share":0,"size":1806336},"name":"SearchFilterHost.exe","pgid":0,"pid":8776,"ppid":2524,"state":"running","username":"NT AUTHORITY\SYSTEM"}},"type":"metricsets"}
val lines = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet)
//val message = lines.map(_._1) map(_._1) 数据是空的 null
val message = lines.map(_._2) //map(_._2) 才是Kafka里面打入的数据
val words = message.flatMap(_.split(","))

val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
//message.print() checked

ssc.start()
ssc.awaitTermination()
}
}
 
 
原文地址:https://www.cnblogs.com/drawwindows/p/6519931.html