Spark Pipeline

一、Spark Pipeline

1.1 机器学习管道(Pipeline)

由一系列阶段构成,每个阶段是Transformer或Estimator,它们串联到一起按照顺序执行。

1.2 数据管道组件构成

Transformer:算法可以把一个DataFrame转换成另一个DataFrame。

  • 特征转换器(feature transformer),读取输入数据集中的一列(比如text),将产生新的特征列。
  • 学习模型(learning model),将一个有特征列的DataFrame转换成一个有预测信息的DataFrame。

EStimator:Estimator就是一种机器学习算法,会从输入数据中进行学习,并产生一个训练模型(Transformer)。

1.3 构建Pipeline

val training = spark.createDataFrame(Seq(
	(0L,"a b c d e spark",1.0),
	(1L,"b d",0.0),
	(2L,"spark f g h",1.0),
	(3L,"hadoop mapreduce",0.0)
)).toDF("id","text","label")

//配置ML Pipeline,包含三部分:tokenizer,hashingTF,lr。
val tokenizer = nwe Tokenizer()
	.setInputCol("text")
	.setOutputCol("words")
val hashingTF = new HashingTF()
	.setNumFeatures(1000)
	.setInputCol(tokenizer.getOutputCol)
	.setOutputCol("features")
val lr = new LogisticRegression()
	.setMaxIter(10)
	.setRegPaam(0.001)
val pipeline = new Pipeline()
	.setStages(Array(tokenizer,hashingTF,lr))

val model = pipeline.fit(training)

可以将已经fit后的操作,存入磁盘。

model.write.overwrite().save("/tmp/spark-logistic-regression-model")

1.4 预测Pipeline

val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

val rawdata = spark.createDataFrame(Seq(
	(4L,"spark i j k"),
	(5L,"l m n"),
	(6L,"spark hadoop spark"),
	(7L,"apache hadoop")
)).toDF("id","text")
model.transform(rawdata)
	.select("id","text","probability","prediction")
	.collect()
	.foreach { case Row(id:Long,text:String,prob:Vector,prediction:Double)=>
		println(s"($id,$text)--> prob=$prob,prediction=$prediction)
}
原文地址:https://www.cnblogs.com/aixing/p/13327222.html