PCA in MLLib

SVD分解: (A=USigma V^T),变换:(hat{A}=Acdot V=USigma)

分解时先计算(A^TA=USigma^2U^T),再进行SVD分解

/**
   * Computes the top k principal components and a vector of proportions of
   * variance explained by each principal component.
   * Rows correspond to observations and columns correspond to variables.
   * The principal components are stored a local matrix of size n-by-k.
   * Each column corresponds for one principal component,
   * and the columns are in descending order of component variance.
   * The row data do not need to be "centered" first; it is not necessary for
   * the mean of each column to be 0.
   *
   * @param k number of top principal components.
   * @return a matrix of size n-by-k, whose columns are principal components, and
   * a vector of values which indicate how much variance each principal component
   * explains
   *
   * @note This cannot be computed on matrices with more than 65535 columns.
   */
  @Since("1.6.0")
  def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = {
    val n = numCols().toInt
    require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")
    
    // spark 分布式计算A^T A
    val Cov = computeCovariance().asBreeze.asInstanceOf[BDM[Double]]

    // Breeze计算svd分解
    val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
    // explained varience 归一化成Ratio
    val eigenSum = s.data.sum
    val explainedVariance = s.data.map(_ / eigenSum)
    // 返回U,∑
    if (k == n) {
      (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
    } else {
      (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
        Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
    }
  }

计算R:

分布式计算(R=A^TA)
其中(dim(A)=mcdot n),大数据场景下m会很大,但是n一般不会很大。所以计算结果(R)的维度也不会非常大,对(R)进行PCA分解的复杂度可控,单线程计算即可。
分布式计算自相关矩阵(R)的公式:

[egin{align*} ext{calc } A^T A &:\ &r_{ij} = sum_{k=1}^m a_{ki}cdot a_{kj}, ext{where }i,jin 1,...,n\ ext{So, }& ext{R} = sum_{k=1}^m vec{a}_k^T vec{a}_k, ext{where }vec{a}_k=[a_{k1},...,a_{kn}], ext{ $k^{th}$ row} end{align*} ]

Spark代码:

/**
* Computes the Gramian matrix `A^T A`.
*
* @note This cannot be computed on matrices with more than 65535 columns.
*/
@Since("1.0.0")
def computeGramianMatrix(): Matrix = {
val n = numCols().toInt
checkNumColumns(n)
// Computes n*(n+1)/2, avoiding overflow in the multiplication.
// This succeeds when n <= 65535, which is checked above
val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2))

// Compute the upper triangular part of the gram matrix.
val GU = rows.treeAggregate(new BDV[Double](nt))(
seqOp = (U, v) => {
BLAS.spr(1.0, v, U.data)
U
}, combOp = (U1, U2) => U1 += U2)

RowMatrix.triuToFull(n, GU.data)
}

SVD分解:

调用Breeze的SVD库,得到(U,Sigma)

    val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov)
    // Explained variance 归一化
    val eigenSum = s.data.sum
    val explainedVariance = s.data.map(_ / eigenSum)

    if (k == n) {
      (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
    } else {
      (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)),
        Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
    }

Explained Variance Ratio

explained variance ratio of each principal component. It indicates
the proportion of the dataset’s variance that lies along the axis of each principal component.

原文地址:https://www.cnblogs.com/luweiseu/p/7825826.html