SparkMLLib的简单学习

一. 简介

1. 机器学习中,可以将数据划分为连续数据和离散数据

a. 连续数据:可以取任何值,如房价

b. 离散数据:仅有少量特殊值,如一个房屋有2个或3个房间,但不能为2.75个房间

二. 创建向量

1. 向量中的各个维度称为特征

2. Spark中既有局部向量、矩阵,也有分布式矩阵。分布式矩阵由1个多个RDD支持。局部向量有数值型索引和双精度浮点值,且存储在单一机器上。

3. MLlib中有2种类型局部向量:dense和sparse。

a. 稠密型向量(dense)表示为一个数组,如[4500, 41000, 4]

b. 稀疏型向量(sparse)则由两个平行数组构成,其中一个表示索引,另一个表示元素值,如(3, [0, 1, 2], [4500.0, 41000.0, 4.0])

c. 将向量创建为dense或sparse,依赖于数据中有多少null或0。当10000个数据中有9000个数据均为0,若使用稠密型向量,则会浪费空间。

d. 稀疏型向量较常见,Spark原生支持libsvm格式,每一行存储一个特征。

4. 示例

a. 显示导入MLlib向量:

三. 计算相关性

1. 相关性表示两个变量的统计上的关系,如当一个变量发生变化,另一个变量也会变化。相关性分为如下两种:

a. 正相关:一个变量的增加导致另一个变量的增加

b. 负相关:一个变量的增加导致另一个变量的减少

2. Spark支持两种相关性算法:Pearson和Spearman

a. Pearson作用于两个连续值,如一个人的身高和体重,或一个房子的面积和价格

b. Spearman处理一个连续值和一个分类值,如zip码和房间价格

四. 理解特征工程

1. 数据清洗(数据准备)和特征提取在数据处理中耗费的时间最多。

2. 特征选取考虑两个主要方面:特征质量和特征数量

3. 模型不关心标称特征,但对比例或数字特征非常敏感。在很多真实建模场景中,可能存在成百上千的规模差异。特征规模伸缩有如下方法:

a. 将特征值除以最大值,使得每个特征都在特定范围内

b. 将特征除以一个区间,即最大值-最小值

c. 通过平均值减去特征值,然后除以区间

d. 通过平均值减去特征值,然后除以标注偏差

4. TF-IDF用于反映一个词在文档集中的重要程度。其计算一个词的IT-IDF的伪计算公式可表示为:

    TF-IDF = TF*IDF = 在某一文档中词条出现次数/该文档中所有的词条数目 * log(语料库的文档总数 / 包含该词条的文档数 + 1)

示例:com.ws.spark.study.ml.MLTest "run tf-idf"

五. 理解Spark ML

1. 机器的学习过程主要有:

a. 为机器学习算法提供训练数据集以及超参数

b. 训练的结果是一个模型

c. 然后用模型预测测试数据

2. Spark ML中,一个预估器以DataFrame方式提供(通过fit方法),训练后输出是一个Transformer。Transformer以DateFrame作为输入,并且

输出一个转换后的DataFrame。Transformer不只限于预测模型,也可用于特征转换。

3. 机器学习pipeline定义为阶段(stage)序列,每个阶段可以是预估器(estimator)或转换器(Transformaer)

示例:com.ws.spark.study.ml.MLTest "test LogisticRegression"

六. 理解超参数调整

1. 每一个机器学习算法在开始训练前,均需要设置一些超参数,如步长、学习率、回归参数等。这些超参数一般手工设置。

示例:com.ws.spark.study.ml.MLTest "test hyperparameter tuning"

七. MLLib监督学习-回归

1. 监督学习分为两类:回归和分类

a. 回归:预测连续值的输出,如房价

b. 分类:预测离散值(二值或多值)的输出(label),例如是否为垃圾邮件,或邮件标记为重要,紧急,不重要等

2. 回归的流程一般包含:

1) 获取标记数据

a. 标记数据如何获取取决于使用场景

b. 标记数据量需要大于特征数,如果过小的话,导致过拟合。

2) 将标记数据划分两部分

a. 将数据按特定比例随机划分,且每次划分时均需随机,以避免偏见

b. 划分的第一部分为训练集,第二部分为测试集。有时也会分为三部分:训练集、交叉验证、测试集,此时测试集只用于测量准确度

3) 使用算法训练数据集。训练后结果称为模型。模型的训练和创建也涉及到超参数(简单理解为配置参数)调整。

4) 使用训练好的模型对测试集预测

3. 仅有一个特征进行预测时,称为二变量分析;当有多个特征时,称为多变量分析。实际上,我们可以有任意数量的特征,例如SVM允许无穷的特征量。

示例:线性回归 com.ws.spark.study.ml.MLTest "test linear regression"

4.. 理解损失函数

1) 线性回归中的损失函数:均方误差 -> J(θ0,θ1) = sum((hi(x) - yi)^2) / 2m,其中θ0表示y轴截距,θ1表示斜率,hi(x)表示第i个元素x对应的预测值

   yi表示第i个元素对应的真实值

2) 求取线性损失函数获取最优值,采用梯度下降方法。在Spark中的实现是随机梯度下降(stochastic gradient descent)。梯度下降是众多爬山演算法中的一部分,

   另一个算法在Spark中引入的是限制内存尺度法(limited-memory BFGS)。这些算法用于寻找一个函数梯度为0的优化点。

5. Lasso线性回归

1) Lasso是线性回归收缩和选择方法,通常使用系数绝对值之和的上限来最小化平方误差和。之前使用的是普通最小二乘(ordinary least

   squares),OLS存在两大挑战:

a. 预测精度:使用OLS的预测通常由小的预测偏差和大的方差。可通过收缩系数(甚至将其设置为0)提高预测精度,代价就是增加偏差

b. 解释性:由于有大量的预测因子可供使用,希望找到具有最强效果的子集(相关性)

2) 偏差是预测结果与实际值的距离的估计值,方差是不同预测值之间预测值差异的估计值。一般的增加特征维度,可以减少偏差。

   减少特征维度或增加数据集可以减少方差

3) Lasso的主要特性在于:任何认定为无用特征,Lasso均将其系数设置为0,来将其移出等式。

示例:com.ws.spark.study.ml.MLTest "test lasso regression"

6. ridge线性回归

1) 与Lasso将特征系数设置为0,而ridge,特征将会被处罚,但不会设置为0

示例:com.ws.spark.study.ml.MLTest "test ridge regression"

八. MLLib监督学习-分类

1. 线性回归在回归任务中表现较好,但在分类任务中存在的限制如下:

a. 拟合过程很容易受到异常值的影响

b. 无法保证假设函数满足0-1范围

2. 逻辑回归保障假设函数位于0-1,即 0 <= h(x) <= 1。逻辑回归表示为h(x) = g(θx),其中g为sigmoid函数,定义为g(t) = 1 / (1 + e^-t)

3. Spark MLLib中有两个类支持逻辑回归:LogisticRegressionWithSGD和LogisticRegressionWithLBFGS

示例:com.ws.spark.study.ml.MLTest "test logistic regression"

4. SVM

示例:com.ws.spark.study.ml.MLTest "test svm algorithm"

5. 决策树

1) 决策时的优点:

a. 容易理解和解释

b. 分类和连续特征均起作用

c. 缺失特征也起作用

d. 不需要特征伸缩

2) Spark使用三种方法来决定异质:基尼异质(Gini impurity,用于分类),熵(用于分类),方差(用于回归)

示例:

a. com.ws.spark.study.ml.MLTest "test decision tree"

b. com.ws.spark.study.ml.MLTest "test decision tree2

6. 随机森林

1) 有时,一个决策树并不够,需要一组决策树来提供更强大的模型,这被称为集成学习算法。典型地有随机森林。

2) 随机森林提供K个树,为每个树提供训练数据的随机子集S。每个树仅使用特征的子集。当需要进行预测时,会在树之间进行多数投票,

   并将投票结果作为预测结果。

3) 随机森林的作用方式体现在两方面的随机选取:数据的子集和拆分数据的特征子集

示例:com.ws.spark.study.ml.MLTest "test random forest"

7. 梯度提升树GBT

1) 另一个集成学习算法是梯度提升树(GBT)。GBT每次训练一个棵树,其中每一颗新树改进了先前训练的树的缺点。

2) 由于GBT每次只训练一棵树,因此相比随机森林算法,其耗时更长。

3) 注意:目前Spark MLLib 2.2.4中的GBT仅支持二值分类,不支持多值分类

示例:com.ws.spark.study.ml.MLTest "test GBT"

8. 朴素贝叶斯

1) 朴素贝叶斯假设特征之间是相互独立的

示例:com.ws.spark.study.ml.MLTest "test native bayes"

九. 无监督学习

1. 与有监督学习不同的是,无监督学习没有对应的标注数据,需要算法基于自身找到一个结构。

2. 无监督学习最常见的就是聚类。聚类是将数据分为多个组,每个组中的数据彼此相似。

3. K-means算法是常用的聚类算法

a. 首先随机选择两个点作为聚类质心

b. 群集分配:将遍历每个数据点,并将其分配到更接近的质心,并作为所呈现的聚类中一部分

c. 移动质心:会将质心移动到聚类中的数据点的均值位置

4. PCA降维

1) 降维即降低特征的维度,可用于数据压缩和可视化。降维可以提高算法效率,减少磁盘内存占用,也可以将高度相关维度减少到1个。

2) 当将1000个特征投影为100个特征时,这100个特征可能无法给出真实的含义。

3) 使用PCA前,可能需要进行特征归一化,即将不同量级的特征归一化到同等量级。

4) 常见有4种简单方法进行特征归一化

a. 将特征值除以最大值,将每个特征置于-1≤x≤1范围内

b. 将特征值除以范围,即最大值 - 最小值

c. 通过平均值减去特征值,然后将其除以范围

d. 通过平均值减去特征值,然后将其除以标准偏差

5. 奇异值分解降维

1) 基本思想是采用高维度,高度可变的数据点集合,并将其缩小到低维空间,从而更清楚地暴露原始数据的结构,并从最小量的变化中对其进行排序。

2) 奇异值分解常用语NLP中。SVD基于线性代数的鼎力,矩形矩阵A可以分解为三个矩阵的乘积:正交矩阵U,对角矩阵S和正交矩阵V

参考代码:

package com.ws.spark.study.ml

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, FlatSpec}

class MLTest extends FlatSpec with BeforeAndAfterAll {

  private var sc: SparkContext = null
  private var session: SparkSession = null

  private val BASE_DATA_DIR = "E:/IntelliJWorkSpace/sparkstudy/data"

  override protected def beforeAll(): Unit = {
    sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("test"))
    session = SparkSession.builder().appName("testCorrelation").master("local[1]").getOrCreate()
  }

  override protected def afterAll(): Unit = {
    sc.stop()
    session.stop()
  }

  def testSpark(): Unit ={
    println(sc.version)
    sc.parallelize(List(1,2,3,4)).foreach(f => {
      val v = f + 10
      println(v)
    })
    sc.stop()
  }

  /**
    * 计算向量
    */
  def testVector(): Unit ={
    // 显示导入MLlib向量
    import org.apache.spark.ml.linalg.Vectors
    // 创建稠密型向量
    val denseHouse = Vectors.dense(4500d, 41000d, 4d)
    // 创建稀疏型向量
    val sparseHouse = Vectors.sparse(3, Array(0, 1, 2), Array(4500d, 41000d, 4d))
    // 创建为值0的向量
    val zeros = Vectors.zeros(3)
  }

  /**
    * 测试特征的相关性
    */
  "The correlation of price and size " should "> 0.85" ignore {
    val houses = session.createDataFrame(Seq(
      (1620000d,2100),
      (1690000d,2300),
      (1400000d,2046),
      (2000000d,4314),
      (1060000d,1244),
      (3830000d,4608),
      (1230000d,2173),
      (2400000d,2750),
      (3380000d,4010),
      (1480000d,1959)
    )).toDF("price", "size")

    // 计算corr时,默认使用pearson算法, corr提供了指定算法的参数
    // 0.85表示2个特征有很强的正相关
    assert(houses.stat.corr("price", "size") > 0.85)
    assert(houses.stat.corr("price", "size", "pearson") > 0.85)

  }

  /**
    * 测试TF-IDF
    */
  "run tf-idf" should "success" ignore {
    import org.apache.spark.ml.feature._
    // 数据加载
    //    val sentenceData = session.read.option("delimiter", "
").csv("file://test.csv").toDF("sentence")
    val sentenceData = session.read.json(s"file:///$BASE_DATA_DIR/video.json").toDF("name")
    // 创建转换器对句子分词
    val tokenizer = new Tokenizer().setInputCol("name").setOutputCol("words")
    // 对句子分词
    val wordsData = tokenizer.transform(sentenceData)
    // 创建哈希转换器, setNumFeatures表示设置hash的维数
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
    // 将句子中的单词映射为词频
    val featurizedData = hashingTF.transform(wordsData)
    // 创建IDF评估器
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    // 训练IDF模型
    val idfModel = idf.fit(featurizedData)
    // 基于IDF模型重新调整TF模型
    val rescaleData = idfModel.transform(featurizedData)
    rescaleData.select("rawFeatures").limit(10).show()
  }

  /**
    * 测试预估器和转换器
    */
  "test LogisticRegression" should "success" ignore {
    import org.apache.spark.ml.classification.LogisticRegression
    import org.apache.spark.ml.linalg.{Vectors, Vector}
    // 创建球员信息
    val lebron = (1.0, Vectors.dense(80.0, 250.0))
    val tim = (0.0, Vectors.dense(70.0, 150.0))
    val brittany = (1.0, Vectors.dense(80.0, 207.0))
    val stacey = (0.0, Vectors.dense(65.0, 120.0))

    // 创建训练DataFrame
    val tranning = session.createDataFrame(Seq(lebron, tim, brittany, stacey))
      .toDF("label", "features")
    // 创建逻辑回归预估器
    val estimator = new LogisticRegression()
    // 通过匹配预估器与训练DataFrame来创建一个转换器
    val transformer = estimator.fit(tranning)
    // 创建测试数据
    val john = Vectors.dense(90.0, 270.0)
    val tom = Vectors.dense(62.0, 120.0)
    val test = session.createDataFrame(Seq((1.0, john), (0.0, tom))).toDF("label", "features")

    // 使用转换器预测
    val results = transformer.transform(test)
    // 打印结果DataFrame的scehma,可以看到,处理prediction,转换器增加了rawPrediction和概率两个列
    results.printSchema()
    results.show()
    // 只显示features和prediction
    val predictions = results.select("features", "prediction")
    predictions.show()
  }

  /***
    * 测试超参数调整
    */
  "test hyperparameter tuning" should "success" ignore {
    import org.apache.spark.ml.regression.LinearRegression
    import org.apache.spark.ml.evaluation.RegressionEvaluator
    import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

    val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/iris_libsvm.txt")
    // 将数据拆分为训练数据和预测数据
    val Array(training, test) = data.randomSplit(Array(0.7, 0.3), seed = 10000)
    // 初始化线性回归
    val lr = new LinearRegression().setMaxIter(10)
    // 使用ParamGridBuilder创建参数网格来搜索。TrainValidationSplit将尝试所有的数值组合
    // 以决定使用evaluator最好的模型
    val paramGrid = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(0.1, 0.01))
      .addGrid(lr.fitIntercept)
      .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
      .build()

    // 创建训练验证拆分
    // TrainValidationSplit需要一个Estimator, 一组Estimator ParamMap和一个Evaluator
    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(lr)
      .setEvaluator(new RegressionEvaluator())
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.8) // 80%的数据用于训练,剩余的20%用于预测

    // 训练模型,选择最好的一组参数
    val model = trainValidationSplit.fit(training)
    // 基于测试集预测
    val predictions = model.transform(test)
    predictions.select("features", "label", "prediction").show()
    // 评估预测结果
    val evaluator = new RegressionEvaluator()
    println(evaluator.evaluate(predictions))
  }

  /**
    * 线性回归
    */
  "test linear regression" should "success" ignore {
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.regression.LinearRegression

    // 房价作为label
    val points = session.createDataFrame(Seq(
      (1620000, Vectors.dense(2100)),
      (1690000, Vectors.dense(2300)),
      (1400000, Vectors.dense(2046)),
      (2000000, Vectors.dense(4314)),
      (1060000, Vectors.dense(1244)),
      (3830000, Vectors.dense(4608)),
      (1230000, Vectors.dense(2173)),
      (2400000, Vectors.dense(2750)),
      (3380000, Vectors.dense(4010)),
      (1480000, Vectors.dense(1959))
    )).toDF("label", "features")
    // 初始化线性回归
    val lr = new LinearRegression()
    // 使用数据集训练
    val model = lr.fit(points)
    // 创建测试数据
    val test = session.createDataFrame(Seq(Vectors.dense(2100.0)).map(Tuple1.apply)).toDF("features")
    // 预测
    val predictions = model.transform(test)
    predictions.show()
  }
  "test linear regression2" should "success" ignore {
    import org.apache.spark.ml.regression.LinearRegression
    import org.apache.spark.ml.evaluation.RegressionEvaluator

    val data = session.read.format("libsvm").load("s3a://sparkcookbook/housingdata/realestate.libsvm")
    val Array(trainning, test) = data.randomSplit(Array(0.7, 0.3))
    val lr = new LinearRegression()
    val model = lr.fit(trainning)
    val predictions = model.transform(test)
    val evaluator = new RegressionEvaluator()
    evaluator.evaluate(predictions)
  }

  /**
    * 测试线性回归中的Lasso算法
    */
  "test lasso linear regression" should "success" ignore{
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.regression.LinearRegression

    val points = session.createDataFrame(Seq(
      (1d, Vectors.dense(5.0,3,1,2,1,3,2,2,1)),
      (2d, Vectors.dense(9.0,8,8,9,7,9,8,7,9))
    )).toDF("label", "features")

    // 将setElasticNetParam设置1表示Lasso或L1归一化
    val lr = new LinearRegression().setMaxIter(10).setFitIntercept(false).setRegParam(.3).setElasticNetParam(1)
    val model = lr.fit(points)
    // [0.18531922804008738,0.013616539127918346,0.0,0.0,0.0,0.009917465418232699,0.0,0.0,0.0]
    println(model.coefficients)
  }
  /**
    * 测试线性回归中的Ridge算法
    */
  "test ridge linear regression" should "success" ignore{
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.regression.LinearRegression

    val points = session.createDataFrame(Seq(
      (1d, Vectors.dense(5.0,3,1,2,1,3,2,2,1)),
      (2d, Vectors.dense(9.0,8,8,9,7,9,8,7,9))
    )).toDF("label", "features")

    // 将setElasticNetParam设置1表示Lasso或L1归一化
    val lr = new LinearRegression().setMaxIter(10).setFitIntercept(false).setRegParam(.3).setElasticNetParam(0)
    val model = lr.fit(points)
    // [0.11329331633450115,0.03937073300046667,0.002369276442275244,0.010416987598811298,0.0043289885742031475,0.026236646722551396,0.015282817648377314,0.023597219133656366,0.0011928984792447094]
    println(model.coefficients)
  }

  /***
    * 测试逻辑回归
    */
  "test logistic regression" should "success" ignore {
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.classification.{LogisticRegression, BinaryLogisticRegressionSummary}

    val trainingDataSet = session.createDataFrame(Seq(
      (0.0,Vectors.dense(0.245)),
      (0.0,Vectors.dense(0.247)),
      (1.0,Vectors.dense(0.285)),
      (1.0,Vectors.dense(0.299)),
      (1.0,Vectors.dense(0.327)),
      (1.0,Vectors.dense(0.347)),
      (0.0,Vectors.dense(0.356)),
      (1.0,Vectors.dense(0.36)),
      (0.0,Vectors.dense(0.363)),
      (1.0,Vectors.dense(0.364)),
      (0.0,Vectors.dense(0.398)),
      (1.0,Vectors.dense(0.4)),
      (0.0,Vectors.dense(0.409)),
      (1.0,Vectors.dense(0.421)),
      (0.0,Vectors.dense(0.432)),
      (1.0,Vectors.dense(0.473)),
      (1.0,Vectors.dense(0.509)),
      (1.0,Vectors.dense(0.529)),
      (0.0,Vectors.dense(0.561)),
      (0.0,Vectors.dense(0.569)),
      (1.0,Vectors.dense(0.594)),
      (1.0,Vectors.dense(0.638)),
      (1.0,Vectors.dense(0.656)),
      (1.0,Vectors.dense(0.816)),
      (1.0,Vectors.dense(0.853)),
      (1.0,Vectors.dense(0.938)),
      (1.0,Vectors.dense(1.036)),
      (1.0,Vectors.dense(1.045)))
    ).toDF("label", "features")
    val lr = new LogisticRegression()
    val model = lr.fit(trainingDataSet)
    // 创建训练总结
    val traningSummary = model.summary
    // 强制转换为二值逻辑回归总结
    val binarySummary = traningSummary.asInstanceOf[BinaryLogisticRegressionSummary]
    // 打印ROC(Receiver Operating Characteristic)下的区域,ROC是一个用于访问预测精度的统计工具
    println(binarySummary.areaUnderROC)
  }

  /**
    * SVM算法
    */
  "test svm algorithm" should "success" ignore {
    import org.apache.spark.mllib.util.MLUtils
    import org.apache.spark.mllib.classification.SVMWithSGD

    val data = MLUtils.loadLibSVMFile(sc, s"file:///$BASE_DATA_DIR/diabetes.libsvm")
    // 统计数据量
    assert(768 === data.count())
    // 将数据拆分为训练和测试
    val trainingAndTest = data.randomSplit(Array(0.5, 0.5))
    val trainingData = trainingAndTest(0)
    val testData = trainingAndTest(1)

    // 训练算法,构建模型的迭代次数为100
    // 可以设置不同的迭代次数,但在一个确定的拐点时,你将看到结果开始收敛
    val model = SVMWithSGD.train(trainingData, 100)
    // 结果第一条测试数据
    val label = model.predict(testData.first().features)
    println(s"${testData.first().features}	$label")

    // 预测整个测试集,输出(测试label, 真实label)
    val predictsAndLabels = testData.map(f => (model.predict(f.features), f.label))
    println(predictsAndLabels.filter(p => p._1 != p._2).count())
  }

  /**
    * 决策树
    */
  "test decision tree" should "success" ignore {
    import org.apache.spark.mllib.tree.DecisionTree
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.tree.configuration.Algo._
    import org.apache.spark.mllib.tree.impurity.Entropy

    // 获取训练数据,并封装在LabeledPoint
    val data = sc.parallelize(Seq(
      (0.0,1.0,1.0,2.0),
      (0.0,1.0,1.0,1.0),
      (0.0,1.0,1.0,0.0),
      (0.0,0.0,1.0,2.0),
      (0.0,0.0,1.0,0.0),
      (1.0,0.0,0.0,2.0),
      (1.0,0.0,0.0,1.0),
      (0.0,0.0,0.0,0.0)
    )).map(f => LabeledPoint(f._1, Vectors.dense(Array(f._2, f._3, f._4))))
    // 训练模型
    val model = DecisionTree.train(data, Classification, Entropy, 3)
    // 创建测试向量
    val v = Vectors.dense(0.0, 1.0, 0.0)
    assert(0.0 === model.predict(v))
  }
  // 使用ml下的决策树算法
  "test decision tree2" should "success" ignore {
    import org.apache.spark.ml.classification.DecisionTreeClassifier
    import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

    val data = session.read.format("libsvm").option("inferschema", "true")
      .load(s"file:///$BASE_DATA_DIR/diabetes.libsvm")
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    val dt = new DecisionTreeClassifier().setImpurity("entropy")
    val model = dt.fit(trainingData)
    val predictions = model.transform(testData)
    val evaluator = new BinaryClassificationEvaluator()
    val auroc = evaluator.evaluate(predictions)
    println(s"Area under ROC = $auroc")
  }

  /**
    * 测试随机森林
    */
  "test random forest" should "success" ignore {
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

    val data = session.read.format("libsvm").load(s"$BASE_DATA_DIR/rf.libsvm")
    val Array(training, test) = data.randomSplit(Array(0.7, 0.3))
    val rf = new RandomForestClassifier().setNumTrees(3)
    val model = rf.fit(training)
    val predictions = model.transform(test)
    val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(accuracy)
    println(model.toDebugString)
  }

  /**
    * 测试梯度提升树
    */

  "test GBT" should "success" ignore {
    import org.apache.spark.ml.classification.GBTClassifier
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

    val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/diabetes.libsvm")
    val Array(training, test) = data.randomSplit(Array(0.7, 0.3))
    val gbt = new GBTClassifier().setMaxIter(10)
    val model = gbt.fit(training)
    val predictions = model.transform(test)
    val evaluator = new MulticlassClassificationEvaluator().setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(accuracy)
  }

  /**
    * 朴素贝叶斯
    */
  "test native bayes" should "success" in {
    import org.apache.spark.ml.classification.NaiveBayes
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

    val data = session.read.format("libsvm").load(s"file:///$BASE_DATA_DIR/diabetes.libsvm")
    val Array(training, test) = data.randomSplit(Array(0.7, 0.3))
    val model = new NaiveBayes().fit(training)
    val predictions = model.transform(test)
    val evaluator = new MulticlassClassificationEvaluator()
    println(evaluator.evaluate(predictions))
  }

  /**
    * 测试K-means
    */
  "test k-means" should "success" in {
    import org.apache.spark.ml.linalg.Vectors
    import org.apache.spark.ml.clustering.KMeans

    val data = session.createDataFrame(
      Seq(
      Vectors.dense(12839,2405),
      Vectors.dense(10000,2200),
      Vectors.dense(8040,1400),
      Vectors.dense(13104,1800),
      Vectors.dense(10000,2351),
      Vectors.dense(3049,795),
      Vectors.dense(38768,2725),
      Vectors.dense(16250,2150),
      Vectors.dense(43026,2724),
      Vectors.dense(44431,2675),
      Vectors.dense(40000,2930),
      Vectors.dense(1260,870),
      Vectors.dense(15000,2210),
      Vectors.dense(10032,1145),
      Vectors.dense(12420,2419),
      Vectors.dense(69696,2750),
      Vectors.dense(12600,2035),
      Vectors.dense(10240,1150),
      Vectors.dense(876,665),
      Vectors.dense(8125,1430),
      Vectors.dense(11792,1920),
      Vectors.dense(1512,1230),
      Vectors.dense(1276,975),
      Vectors.dense(67518,2400),
      Vectors.dense(9810,1725),
      Vectors.dense(6324,2300),
      Vectors.dense(12510,1700),
      Vectors.dense(15616,1915),
      Vectors.dense(15476,2278),
      Vectors.dense(13390,2497.5),
      Vectors.dense(1158,725),
      Vectors.dense(2000,870),
      Vectors.dense(2614,730),
      Vectors.dense(13433,2050),
      Vectors.dense(12500,3330),
      Vectors.dense(15750,1120),
      Vectors.dense(13996,4100),
      Vectors.dense(10450,1655),
      Vectors.dense(7500,1550),
      Vectors.dense(12125,2100),
      Vectors.dense(14500,2100),
      Vectors.dense(10000,1175),
      Vectors.dense(10019,2047.5),
      Vectors.dense(48787,3998),
      Vectors.dense(53579,2688),
      Vectors.dense(10788,2251),
      Vectors.dense(11865,1906)
    ).map(Tuple1.apply)).toDF("features")
    // 创建k-means预估器
    val kmeans = new KMeans().setK(4).setSeed(1L)
    // 模型训练
    val model = kmeans.fit(data)
    val test = session.createDataFrame(
      Seq(
        Vectors.dense(876, 665),
        Vectors.dense(15750,1120),
        Vectors.dense(38768,2725),
        Vectors.dense(69696,2750)
      ).map(Tuple1.apply)).toDF("features")
    val prediction = model.transform(test)
    println(prediction.show())
  }

  /**
    * 1. 使用PCA将 house size 和 lot size 合并为一个特征 房屋的z密度
    * 2. 通过平均值减去特征值,然后将其除以标准偏差,以进行特征归一化
    * 3. 使用线性回归来观察z密度对房价的影响程度
    */
  "test pca" should "success" ignore {
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.linalg.distributed._

    val data = sc.textFile("s3a://sparkcookbook/saratoga/scaledhousedata.csv")
    val parsedData = data.map(line => Vectors.dense(line.split(",").map(_.toDouble)))
    val mat:RowMatrix = new RowMatrix(parsedData)
    // 计算一个主成分
    val pc = mat.computePrincipalComponents(1)
    val projected = mat.multiply(pc)
    val projectedRDD = projected.rows
    projectedRDD.saveAsTextFile("phdata")
  }

  /**
    * 奇异值分解
    */
  "test svd" should "success" in {
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.linalg.distributed.RowMatrix

    val data = sc.parallelize(Seq(
      Vectors.dense(1,2),
      Vectors.dense(2,3),
      Vectors.dense(1,4),
      Vectors.dense(1,0),
      Vectors.dense(1,0),
      Vectors.dense(1,3),
      Vectors.dense(1,2),
      Vectors.dense(1,0),
      Vectors.dense(1,2),
      Vectors.dense(0,3),
      Vectors.dense(0,1),
      Vectors.dense(0,2)
    ))
    val mat = new RowMatrix(data)
    val svd = mat.computeSVD(2, true)
    val u = svd.U
    val s = svd.s
    val v = svd.V
    println(s"$u
$s
$v")
  }

}
View Code
原文地址:https://www.cnblogs.com/mengrennwpu/p/11301290.html