Spark MLlib之线性回归源代码分析

1.理论基础

    线性回归(Linear Regression)问题属于监督学习(Supervised Learning)范畴,又称分类(Classification)或归纳学习(Inductive Learning);这类分析中训练数据集中给出的数据类标是确定的。机器学习的目标是,对于给定的一个训练数据集,通过不断的分析和学习产生一个联系属性集合和类标集合的分类函数(Classification Function)或预測函数(Prediction Function),这个函数称为分类模型(Classification Model)或预測模型(Prediction Model);通过学习得到的模型能够是一个决策树,规格集,贝叶斯模型或一个超平面。通过这个模型能够对输入对象的特征向量预測或对对象的类标进行分类。

   回归问题中通常使用最小二乘(Least Squares)法来迭代最优的特征中每一个属性的比重,通过损失函数(Loss Function)或错误函数(Error Function)定义来设置收敛状态,即作为剃度下降算法的逼近參数因子。

2.矩阵向量运算库jblas介绍

    因为spark MLlib中使用jlbas的线性代数运算库,因此学习和掌握jlbas库中主要的运算,对分析和学习spark中MLlib非常多算法非常有帮助。以下使用jlbas中DoubleMatrix矩阵对jlbas中基本运算进行简介:

    val matrix1 = DoubleMatrix.ones(10, 1) //创建全部值为1的10*1矩阵
    val matrix2 = DoubleMatrix.zeros(10, 1) //创建全部值为0的10*1矩阵
    matrix1.put(1, -10)
    val absSum = matrix1.norm1() //绝对值之和
    val euclideanNorm = matrix1.norm2() //欧几里德距离
    val matrix3 = (matrix1.addi(matrix2))
    val matrix4 = new DoubleMatrix(1, 10, (1 to 10).map(_.toDouble): _*) //创建Double向量对象
    println("print init value:matrix3=" + matrix3)
    println("print init value:matrix4=" + matrix4)
    println("matrix sub matrix:" + matrix3.sub(matrix4) + "," + matrix4.sub(10)) //减法运算
    println("matrix add matrix:" + matrix3.add(matrix4) + "," + matrix4.add(10)) //加法运算
    println("matrix mul matrix:" + matrix3.mul(matrix4) + "," + matrix4.mul(10)) //乘法运算
    println("matrix div matrix:" + matrix3.div(matrix4) + "," + matrix4.div(10)) //除法运算
    println("matrix dot matrix:" + matrix3.dot(matrix4)) //向量积

    val matrix5 = DoubleMatrix.ones(10, 10)
    println("N*M Vector Matrix sub OP:
" + matrix5.subRowVector(matrix4) + "
" + matrix5.subColumnVector(matrix4)) //多对象减法运算
    println("N*M Vector Matrix add OP:
" + matrix5.addRowVector(matrix4) + "
" + matrix5.addColumnVector(matrix4)) //多对象加法运算
    println("N*M Vector Matrix mul OP:
" + matrix5.mulRowVector(matrix4) + "
" + matrix5.mulColumnVector(matrix4)) //多对象乘法运算
    println("N*M Vector Matrix div OP:
" + matrix5.divRowVector(matrix4) + "
" + matrix5.divColumnVector(matrix4)) //多对象除法运算

3.梯度下降(Gradient Descent)算法介绍

    梯度下降算法用于在迭代过程中逐渐降阶,不断更新特征权重向量,从而得到无限接近或拟合的最优特征权重向量 ;梯度下降算法主要有两种,第一种是批量梯度下降(Batch Gradient Descent)算法。此种方式实现过程是对权重向量进行累加,然后批量更新的一种方式,一般不有用于大规模数据集处理;第二种是随机梯度下降(Stochastic Gradient Descent)算法。这样的方式对给定训练数据集中每一个对象都进行权重计算和更新,在某些情况下easy收敛到局部最优解上。Spark MLlib库中主要使用随机梯度下降算法。为了更好的理解MLlib库中随机梯度算法(MLlib库中类后缀名以SGD结尾的全部算法)实现,

   批量梯度下降算法的理论公式:

                        

   随机梯度下降算法的理论公式:

                        

以下是使用随机梯度算法进行直线拟合的Demo:

  def sgdDemo {
    val featuresMatrix: List[List[Double]] = List(List(1, 4), List(2, 5), List(5, 1), List(4, 2))//特征矩阵
    val labelMatrix: List[Double] = List(19, 26, 19, 20)//真实值向量
    var theta: List[Double] = List(0, 0)
    var loss: Double = 10.0
    for {
      i <- 0 until 1000 //迭代次数
      if (loss > 0.01) //收敛条件loss<=0.01
    } {
      var error_sum = 0.0 //总误差
      var j = i % 4
      var h = 0.0
      for (k <- 0 until 2) {
        h += featuresMatrix(j)(k) * theta(k)
      } //计算给出的測试数据集中第j个对象的计算类标签
      error_sum = labelMatrix(j) - h //计算给出的測试数据集中类标签与计算的类标签的误差值
      var cacheTheta: List[Double] = List()

      for (k <- 0 until 2) {
        val updaterTheta = theta(k) + 0.001 * (error_sum) * featuresMatrix(j)(k)
        cacheTheta = updaterTheta +: cacheTheta
      } //更新权重向量
      cacheTheta.foreach(t => print(t + ","))
      print(error_sum + "
")
      theta = cacheTheta
      //更新误差率
      var currentLoss: Double = 0
      for (j <- 0 until 4) {
        var sum = 0.0
        for (k <- 0 until 2) {
          sum += featuresMatrix(j)(k) * theta(k)
        }
        currentLoss += (sum - labelMatrix(j)) * (sum - labelMatrix(j))
      }
      loss = currentLoss
      println("loss->>>>" + loss / 4 + ",i>>>>>" + i)
    }
  }

4.MLlib线性回归源代码分析

     MLlib中可用的线性回归算法有:LinearRegressionWithSGD。RidgeRegressionWithSGD,LassoWithSGD;MLlib回归分析中涉及到的主要类有,GeneralizedLinearAlgorithm。GradientDescent。以下以对LinearRegressionWithSGD实现进行主要分析。

    第一步:在使用LinearRegressionWithSGD算法之前首先将输入数据解析成包括类标和特征向量的LabeledPoint对象的RDD弹性分布式数据集合。

    第二步:调用LinearRegressionWithSGD伴生对象中的train方法传输第一步创建的RDD集合和最大迭代次数,在train中主要实现创建一个新的LinearRegressionWithSGD对象,初始化梯度下降算使用使用最小二乘梯度下降算法SquaredGradient以及更新权重向量使用SimpleUpdater。运行父类GeneralizedLinearAlgorithm中的run方法进行权重向量和拦截參数计算,并返回训练得到的模型属性权重向量

    LinearRegressionWithSGD伴生对象中train方法实现


  def train(
      input: RDD[LabeledPoint],
      numIterations: Int,
      stepSize: Double,//默认步长为1
      miniBatchFraction: Double)//每次跌打使用的batch因子。默觉得1
    : LinearRegressionModel =
  {
    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
  }

   LinearRegressionWithSGD中run方法实现

  def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {

    // Check the data properties before running the optimizer
    if (validateData && !validators.forall(func => func(input))) {//预验证输入数据的合法性,validators中存储验证的全部方法
      throw new SparkException("Input validation failed.")
    }
    // Prepend an extra variable consisting of all 1.0's for the intercept.
    val data = if (addIntercept) {//判读是否须要加入intercept
      input.map(labeledPoint => (labeledPoint.label, 1.0 +: labeledPoint.features))
    } else {
      input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
    }//对象转化为元组(类标签,特征)

    val initialWeightsWithIntercept = if (addIntercept) {
      0.0 +: initialWeights
    } else {
      initialWeights
    }//初始化权重特征

    val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)//返回优化后的权重

    val (intercept, weights) = if (addIntercept) {
      (weightsWithIntercept(0), weightsWithIntercept.tail)
    } else {
      (0.0, weightsWithIntercept)
    }

    logInfo("Final weights " + weights.mkString(","))
    logInfo("Final intercept " + intercept)

    createModel(weights, intercept)//使用计算后的权重向量和截距创建模型
  }


当中optimizer.optimize(data, initialWeightsWithIntercept)是LinearRegressionWithSGD实现的核心。oprimizer的类型为GradientDescent,optimize方法中主要调用GradientDescent伴生对象的runMiniBatchSGD方法,返回当前迭代产生的最优特征权重向量。

GradientDescentd对象中optimize方法实现

  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
    : Array[Double] = {
	//返回优化后的权重向量。和迭代过程中误差
    val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
        data,
        gradient,
        updater,
        stepSize,
        numIterations,
        regParam,
        miniBatchFraction,
        initialWeights)
    weights
  }

GradientDescent伴生对象中runMiniBatchSGD方法实现

  def runMiniBatchSGD(
    data: RDD[(Double, Array[Double])],
    gradient: Gradient,//SquaredGradient—平方剃度下降算法
    updater: Updater,//SimpleUpdater
    stepSize: Double,//1.0
    numIterations: Int,//100
    regParam: Double,//0.0
    miniBatchFraction: Double,//1.0
    initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {

    val stochasticLossHistory = new ArrayBuffer[Double](numIterations)

    val nexamples: Long = data.count()
    val miniBatchSize = nexamples * miniBatchFraction

    // Initialize weights as a column vector//创建一维向量,第一个參数为行数,第二个參数为列数,第三个參数開始为值
    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
    var regVal = 0.0
    
    for (i <- 1 to numIterations) {
      /**
       * 使用平方梯度下降算法
       * gradientSum:本次选择迭代样本的梯度总和,
       * lossSum:本次选择迭代样本的误差总和
       */
      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
        case (y, features) =>//(标签,特征)
          val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
          val (grad, loss) = gradient.compute(featuresCol, y, weights)//(特征,标签。特征属性权重向量)
          /**
           * class SquaredGradient extends Gradient {
			  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
			      (DoubleMatrix, Double) = {
			    val diff: Double = data.dot(weights) - label//计算当前计算对象的类标签与实际类标签值之差
			    val loss = 0.5 * diff * diff//当前平方梯度下降值
			    val gradient = data.mul(diff)
			    (gradient, loss)
			  }
			}
           */
          (grad, loss)//当前训练对象的特征权重向量和误差
      }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2)) //计算本次迭代所选训练数据新的特征权重向量之和和误差总和     
      /**
       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
       * and regVal is the regularization value computed in the previous iteration as well.
       */
      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)//miniBatchSize=样本中对象数量*batch因子,regVal为回归因子
      val update = updater.compute(
        weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)//weights:属性向量中设置的权重因子,regParam:为回归參数。stepSize:计算步长,i:当前迭代次数
      /**
       * class SimpleUpdater extends Updater {
       * /**
          * weihtsOld:上一次迭代计算后的特征权重向量
          * gradient:本次迭代计算的特征权重向量
          * stepSize:迭代步长
          * iter:当前迭代次数
          * regParam:回归參数 
          */
		  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
		      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
		    val thisIterStepSize = stepSize / math.sqrt(iter)//以当前迭代次数的平方根的倒数作为本次迭代趋近(下降)的因子
		    val normGradient = gradient.mul(thisIterStepSize)
		    (weightsOld.sub(normGradient), 0)//返回本次剃度下降后更新的特征权重向量
		  }
		}
	   *
       */
      weights = update._1
      regVal = update._2//使用SimpleUpdater值为0
    }

    logInfo("GradientDescent finished. Last 10 stochastic losses %s".format(
      stochasticLossHistory.takeRight(10).mkString(", ")))

    (weights.toArray, stochasticLossHistory.toArray)
  }

  在MiniBatchSGD中主要实现对输入数据集进行迭代抽样,通过使用SquaredGradient作为梯度下降算法,使用SimpleUpdater作为更新算法,不断对抽样数据集进行迭代计算从而找出最优的特征权重向量解。

  使用官方測试代码例如以下:

  def linearRegressionAPITest(sc: SparkContext) {
    val url = "/Users/yangguo/hadoop/spark/mllib/data/ridge-data/lpsa.data"
    val data = sc.textFile(url)
    val parseData = data.map { line =>
      val parts = line.split(',')
      LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
    }
    val numIterations = 20
    val model = LinearRegressionWithSGD.train(parseData, numIterations)
    val valuesAndPreds = parseData.map { point =>
      val prediction = model.predict(point.features)
      (point, prediction)
    }
    valuesAndPreds.foreach { case (v, p) => print("[" + v.label + "," + p + "]"); v.features.foreach(base => print(base + "--")); println("
") }
    val isSuccessed = valuesAndPreds.map { case (v, p) => math.pow((p - v.label), 2) }.reduce(_ + _) / valuesAndPreds.count
    println(isSuccessed)
  }


參考:

[1].http://rdc.taobao.org/?p=2163

[2].http://cs229.stanford.edu/notes/cs229-notes1.pdf

[3].http://blog.sina.com.cn/s/blog_62339a2401015jyq.html

[4].http://blog.csdn.net/pennyliang/article/details/6998517

[5].http://en.wikipedia.org/wiki/Lasso_(statistics)#Lasso_method










原文地址:https://www.cnblogs.com/cynchanpin/p/7105151.html