Spark MLlib LDA 源代码解析

1、Spark MLlib LDA源代码解析

Spark MLlib LDA 应该算是比較难理解的,当中涉及到大量的概率与统计的相关知识,并且还涉及到了Spark GraphX图计算方面的知识。要想明确当中的原理得要下一番功夫。

LDA源代码解析前的基础知识:

1)LDA主题模型的理论知识

參照:LDA数学八卦

2)SparkGraphX 基础知识

http://blog.csdn.net/sunbow0/article/details/47612291

http://blog.csdn.net/sunbow0/article/details/47610481

1.1 LDA源代码解析 

class LDA private (
  private var k: Int,
  private var maxIterations: Int,
  private var docConcentration: Double,
  private var topicConcentration: Double,
  private var seed: Long,
  private var checkpointInterval: Int,
  private var ldaOptimizer: LDAOptimizer) extends Logging {
  /**
   * k: 主题数量
   * maxIterations: 迭代次数
   * docConcentration: 超參alpha
   * topicConcentration: 超參beta
   * seed: 随机种子
   * checkpointInterval: 检查间隔
   * ldaOptimizer: 优化方法 "em" "online"
   *
   */
  def this() = this(k = 10, maxIterations = 20, docConcentration = -1, topicConcentration = -1,
    seed = Utils.random.nextLong(), checkpointInterval = 10, ldaOptimizer = new EMLDAOptimizer)

  /**
   * Number of topics to infer.  I.e., the number of soft cluster centers.
   */
  // 获取  主题数量
  def getK: Int = k

  /**
   * Number of topics to infer.  I.e., the number of soft cluster centers.
   * (default = 10)
   */
  // 设置  主题数量
  def setK(k: Int): this.type = {
    require(k > 0, s"LDA k (number of clusters) must be > 0, but was set to $k")
    this.k = k
    this
  }

  /**
   * Concentration parameter (commonly named "alpha") for the prior placed on documents'
   * distributions over topics ("theta").
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   */
  // 获取  超參alpha
  def getDocConcentration: Double = this.docConcentration

  /**
   * Concentration parameter (commonly named "alpha") for the prior placed on documents'
   * distributions over topics ("theta").
   *
   * This is the parameter to a symmetric Dirichlet distribution, where larger values
   * mean more smoothing (more regularization).
   *
   * If set to -1, then docConcentration is set automatically.
   *  (default = -1 = automatic)
   *
   * Optimizer-specific parameter settings:
   *  - EM
   *     - Value should be > 1.0
   *     - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
   *       Asuncion et al. (2009), who recommend a +1 adjustment for EM.
   *  - Online
   *     - Value should be >= 0
   *     - default = (1.0 / k), following the implementation from
   *       [[https://github.com/Blei-Lab/onlineldavb]].
   */
  // 设置  超參alpha
  def setDocConcentration(docConcentration: Double): this.type = {
    this.docConcentration = docConcentration
    this
  }

  // 获取  超參alpha
  /** Alias for [[getDocConcentration]] */
  def getAlpha: Double = getDocConcentration

  // 设置  超參alpha
  /** Alias for [[setDocConcentration()]] */
  def setAlpha(alpha: Double): this.type = setDocConcentration(alpha)

  /**
   * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics'
   * distributions over terms.
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   *
   * Note: The topics' distributions over terms are called "beta" in the original LDA paper
   * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
   */
  // 获取  超參beta
  def getTopicConcentration: Double = this.topicConcentration

  /**
   * Concentration parameter (commonly named "beta" or "eta") for the prior placed on topics'
   * distributions over terms.
   *
   * This is the parameter to a symmetric Dirichlet distribution.
   *
   * Note: The topics' distributions over terms are called "beta" in the original LDA paper
   * by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
   *
   * If set to -1, then topicConcentration is set automatically.
   *  (default = -1 = automatic)
   *
   * Optimizer-specific parameter settings:
   *  - EM
   *     - Value should be > 1.0
   *     - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows
   *       Asuncion et al. (2009), who recommend a +1 adjustment for EM.
   *  - Online
   *     - Value should be >= 0
   *     - default = (1.0 / k), following the implementation from
   *       [[https://github.com/Blei-Lab/onlineldavb]].
   */
  // 设置  超參beta
  def setTopicConcentration(topicConcentration: Double): this.type = {
    this.topicConcentration = topicConcentration
    this
  }

  // 获取  超參beta
  /** Alias for [[getTopicConcentration]] */
  def getBeta: Double = getTopicConcentration

  // 设置  超參beta
  /** Alias for [[setTopicConcentration()]] */
  def setBeta(beta: Double): this.type = setTopicConcentration(beta)

  /**
   * Maximum number of iterations for learning.
   */
  // 获取  迭代次数
  def getMaxIterations: Int = maxIterations

  /**
   * Maximum number of iterations for learning.
   * (default = 20)
   */
  // 获取  迭代次数
  def setMaxIterations(maxIterations: Int): this.type = {
    this.maxIterations = maxIterations
    this
  }

  /** Random seed */
  // 获取  随机种子
  def getSeed: Long = seed

  /** Random seed */
  // 设置  随机种子
  def setSeed(seed: Long): this.type = {
    this.seed = seed
    this
  }

  /**
   * Period (in iterations) between checkpoints.
   */
  // 检查间隔
  def getCheckpointInterval: Int = checkpointInterval

  /**
   * Period (in iterations) between checkpoints (default = 10). Checkpointing helps with recovery
   * (when nodes fail). It also helps with eliminating temporary shuffle files on disk, which can be
   * important when LDA is run for many iterations. If the checkpoint directory is not set in
   * [[org.apache.spark.SparkContext]], this setting is ignored.
   *
   * @see [[org.apache.spark.SparkContext#setCheckpointDir]]
   */
  def setCheckpointInterval(checkpointInterval: Int): this.type = {
    this.checkpointInterval = checkpointInterval
    this
  }

  /**
   * :: DeveloperApi ::
   *
   * LDAOptimizer used to perform the actual calculation
   */
  @DeveloperApi
  def getOptimizer: LDAOptimizer = ldaOptimizer

  /**
   * :: DeveloperApi ::
   *
   * LDAOptimizer used to perform the actual calculation (default = EMLDAOptimizer)
   */
  @DeveloperApi
  def setOptimizer(optimizer: LDAOptimizer): this.type = {
    this.ldaOptimizer = optimizer
    this
  }

  /**
   * Set the LDAOptimizer used to perform the actual calculation by algorithm name.
   * Currently "em", "online" are supported.
   */
  // 优化方法
  def setOptimizer(optimizerName: String): this.type = {
    this.ldaOptimizer =
      optimizerName.toLowerCase match {
        case "em" => new EMLDAOptimizer
        case "online" => new OnlineLDAOptimizer
        case other =>
          throw new IllegalArgumentException(s"Only em, online are supported but got $other.")
      }
    this
  }

  /**
   * Learn an LDA model using the given dataset.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   Document IDs must be unique and >= 0.
   * @return  Inferred LDA model
   */
  // LDA 模型開始训练。输入数据是文档的词向量RDD[(Long, Vector)]
  // ldaOptimizer.initialize(documents, this) 是初始化ldaOptimizer
  // state.next()。ldaOptimizer迭代下一步
  // state.getLDAModel 模型生成
  def run(documents: RDD[(Long, Vector)]): LDAModel = {
    val state = ldaOptimizer.initialize(documents, this)
    var iter = 0
    val iterationTimes = Array.fill[Double](maxIterations)(0)
    while (iter < maxIterations) {
      val start = System.nanoTime()
      state.next()
      val elapsedSeconds = (System.nanoTime() - start) / 1e9
      iterationTimes(iter) = elapsedSeconds
      iter += 1
    }
    state.getLDAModel(iterationTimes)
  }

  /** Java-friendly version of [[run()]] */
  def run(documents: JavaPairRDD[java.lang.Long, Vector]): LDAModel = {
    run(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
  }
}

private[clustering] object LDA {

  /*
    DEVELOPERS NOTE:

    This implementation uses GraphX, where the graph is bipartite with 2 types of vertices:
     - Document vertices
        - indexed with unique indices >= 0
        - Store vectors of length k (# topics).
     - Term vertices
        - indexed {-1, -2, ..., -vocabSize}
        - Store vectors of length k (# topics).
     - Edges correspond to terms appearing in documents.
        - Edges are directed Document -> Term.
        - Edges are partitioned by documents.

    Info on EM implementation.
     - We follow Section 2.2 from Asuncion et al., 2009.  We use some of their notation.
     - In this implementation, there is one edge for every unique term appearing in a document,
       i.e., for every unique (document, term) pair.
     - Notation:
        - N_{wkj} = count of tokens of term w currently assigned to topic k in document j
        - N_{*} where * is missing a subscript w/k/j is the count summed over missing subscript(s)
        - gamma_{wjk} = P(z_i = k | x_i = w, d_i = j),
          the probability of term x_i in document d_i having topic z_i.
     - Data graph
        - Document vertices store N_{kj}
        - Term vertices store N_{wk}
        - Edges store N_{wj}.
        - Global data N_k
     - Algorithm
        - Initial state:
           - Document and term vertices store random counts N_{wk}, N_{kj}.
        - E-step: For each (document,term) pair i, compute P(z_i | x_i, d_i).
           - Aggregate N_k from term vertices.
           - Compute gamma_{wjk} for each possible topic k, from each triplet.
             using inputs N_{wk}, N_{kj}, N_k.
        - M-step: Compute sufficient statistics for hidden parameters phi and theta
          (counts N_{wk}, N_{kj}, N_k).
           - Document update:
              - N_{kj} <- sum_w N_{wj} gamma_{wjk}
              - N_j <- sum_k N_{kj}  (only needed to output predictions)
           - Term update:
              - N_{wk} <- sum_j N_{wj} gamma_{wjk}
              - N_k <- sum_w N_{wk}

    TODO: Add simplex constraints to allow alpha in (0,1).
          See: Vorontsov and Potapenko. "Tutorial on Probabilistic Topic Modeling : Additive
               Regularization for Stochastic Matrix Factorization." 2014.
   */

  /**
   * Vector over topics (length k) of token counts.
   * The meaning of these counts can vary, and it may or may not be normalized to be a distribution.
   */
  /**
   * 自己定义类别及方法
   * TopicCounts 主题分布统计
   * TokenCount 词汇统计
   * term2index index2term 顶点与词汇id 转换 
   * computePTopic 计算主题分布
   * 
   */
  private[clustering]type TopicCounts = BDV[Double]

  private[clustering]type TokenCount = Double

  /** Term vertex IDs are {-1, -2, ..., -vocabSize} */
  private[clustering] def term2index(term: Int): Long = -(1 + term.toLong)

  private[clustering] def index2term(termIndex: Long): Int = -(1 + termIndex).toInt

  private[clustering] def isDocumentVertex(v: (VertexId, _)): Boolean = v._1 >= 0

  private[clustering] def isTermVertex(v: (VertexId, _)): Boolean = v._1 < 0

  /**
   * Compute gamma_{wjk}, a distribution over topics k.
   */
  // docTopicCounts文章的主题分布, termTopicCounts词汇的主题分布, totalTopicCounts有词的主题分布概率和
  // vocabSize词向量长度,eta alpha 超參
  private[clustering] def computePTopic(
    docTopicCounts: TopicCounts,
    termTopicCounts: TopicCounts,
    totalTopicCounts: TopicCounts,
    vocabSize: Int,
    eta: Double,
    alpha: Double): TopicCounts = {
    val K = docTopicCounts.length
    val N_j = docTopicCounts.data
    val N_w = termTopicCounts.data
    val N = totalTopicCounts.data
    val eta1 = eta - 1.0
    val alpha1 = alpha - 1.0
    val Weta1 = vocabSize * eta1
    var sum = 0.0
    val gamma_wj = new Array[Double](K)
    var k = 0
    while (k < K) {
      val gamma_wjk = (N_w(k) + eta1) * (N_j(k) + alpha1) / (N(k) + Weta1)
      gamma_wj(k) = gamma_wjk
      sum += gamma_wjk
      k += 1
    }
    // normalize
    BDV(gamma_wj) /= sum
  }
}

1.2 LDAModel源代码解析 

abstract class LDAModel private[clustering] {

  /** Number of topics */
  // 主题数
  def k: Int

  /** Vocabulary size (number of terms or terms in the vocabulary) */
  // 词向量长度
  def vocabSize: Int

  /**
   * Inferred topics, where each topic is represented by a distribution over terms.
   * This is a matrix of size vocabSize x k, where each column is a topic.
   * No guarantees are given about the ordering of the topics.
   */
  // 主题分布矩阵
  def topicsMatrix: Matrix

  /**
   * Return the topics described by weighted terms.
   *
   * This limits the number of terms per topic.
   * This is approximate; it may not return exactly the top-weighted terms for each topic.
   * To get a more precise set of top terms, increase maxTermsPerTopic.
   *
   * @param maxTermsPerTopic  Maximum number of terms to collect for each topic.
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (term indices, term weights in topic).
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // 每个主题的词权重排序
  def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])]

  /**
   * Return the topics described by weighted terms.
   *
   * WARNING: If vocabSize and k are large, this can return a large object!
   *
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (term indices, term weights in topic).
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  def describeTopics(): Array[(Array[Int], Array[Double])] = describeTopics(vocabSize)

  /* TODO (once LDA can be trained with Strings or given a dictionary)
   * Return the topics described by weighted terms.
   *
   * This is similar to [[describeTopics()]] but returns String values for terms.
   * If this model was trained using Strings or was given a dictionary, then this method returns
   * terms as text.  Otherwise, this method returns terms as term indices.
   *
   * This limits the number of terms per topic.
   * This is approximate; it may not return exactly the top-weighted terms for each topic.
   * To get a more precise set of top terms, increase maxTermsPerTopic.
   *
   * @param maxTermsPerTopic  Maximum number of terms to collect for each topic.
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (terms, term weights in topic) where terms are either the actual term text
   *          (if available) or the term indices.
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // def describeTopicsAsStrings(maxTermsPerTopic: Int): Array[(Array[Double], Array[String])]

  /* TODO (once LDA can be trained with Strings or given a dictionary)
   * Return the topics described by weighted terms.
   *
   * This is similar to [[describeTopics()]] but returns String values for terms.
   * If this model was trained using Strings or was given a dictionary, then this method returns
   * terms as text.  Otherwise, this method returns terms as term indices.
   *
   * WARNING: If vocabSize and k are large, this can return a large object!
   *
   * @return  Array over topics.  Each topic is represented as a pair of matching arrays:
   *          (terms, term weights in topic) where terms are either the actual term text
   *          (if available) or the term indices.
   *          Each topic's terms are sorted in order of decreasing weight.
   */
  // def describeTopicsAsStrings(): Array[(Array[Double], Array[String])] =
  //  describeTopicsAsStrings(vocabSize)

  /* TODO
   * Compute the log likelihood of the observed tokens, given the current parameter estimates:
   *  log P(docs | topics, topic distributions for docs, alpha, eta)
   *
   * Note:
   *  - This excludes the prior.
   *  - Even with the prior, this is NOT the same as the data log likelihood given the
   *    hyperparameters.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   This must use the same vocabulary (ordering of term counts) as in training.
   *                   Document IDs must be unique and >= 0.
   * @return  Estimated log likelihood of the data under this model
   */
  // def logLikelihood(documents: RDD[(Long, Vector)]): Double

  /* TODO
   * Compute the estimated topic distribution for each document.
   * This is often called 'theta' in the literature.
   *
   * @param documents  RDD of documents, which are term (word) count vectors paired with IDs.
   *                   The term count vectors are "bags of words" with a fixed-size vocabulary
   *                   (where the vocabulary size is the length of the vector).
   *                   This must use the same vocabulary (ordering of term counts) as in training.
   *                   Document IDs must be unique and >= 0.
   * @return  Estimated topic distribution for each document.
   *          The returned RDD may be zipped with the given RDD, where each returned vector
   *          is a multinomial distribution over topics.
   */
  // def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)]

}

/**
 * :: Experimental ::
 *
 * Local LDA model.
 * This model stores only the inferred topics.
 * It may be used for computing topics for new documents, but it may give less accurate answers
 * than the [[DistributedLDAModel]].
 *
 * @param topics Inferred topics (vocabSize x k matrix).
 */
// Local模式的LDA模型
@Experimental
class LocalLDAModel private[clustering] (
    private val topics: Matrix) extends LDAModel with Serializable {

  override def k: Int = topics.numCols

  override def vocabSize: Int = topics.numRows

  override def topicsMatrix: Matrix = topics

  override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = {
    val brzTopics = topics.toBreeze.toDenseMatrix
    Range(0, k).map { topicIndex =>
      val topic = normalize(brzTopics(::, topicIndex), 1.0)
      val (termWeights, terms) =
        topic.toArray.zipWithIndex.sortBy(-_._1).take(maxTermsPerTopic).unzip
      (terms.toArray, termWeights.toArray)
    }.toArray
  }

  // TODO
  // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

// TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??

? } /** * :: Experimental :: * * Distributed LDA model. * This model stores the inferred topics, the full training dataset, and the topic distributions. * When computing topics for new documents, it may give more accurate answers * than the [[LocalLDAModel]]. */ // 分布式的LDA模型 @Experimental class DistributedLDAModel private ( private val graph: Graph[LDA.TopicCounts, LDA.TokenCount], private val globalTopicTotals: LDA.TopicCounts, val k: Int, val vocabSize: Int, private val docConcentration: Double, private val topicConcentration: Double, private[spark] val iterationTimes: Array[Double]) extends LDAModel { import LDA._ private[clustering] def this(state: EMLDAOptimizer, iterationTimes: Array[Double]) = { this(state.graph, state.globalTopicTotals, state.k, state.vocabSize, state.docConcentration, state.topicConcentration, iterationTimes) } /** * Convert model to a local model. * The local model stores the inferred topics but not the topic distributions for training * documents. */ def toLocal: LocalLDAModel = new LocalLDAModel(topicsMatrix) /** * Inferred topics, where each topic is represented by a distribution over terms. * This is a matrix of size vocabSize x k, where each column is a topic. * No guarantees are given about the ordering of the topics. * * WARNING: This matrix is collected from an RDD. Beware memory usage when vocabSize, k are large. */ // 主题的概率分布矩阵,列代表主题。行代表词典。每一行代表词的主题分布概率 override lazy val topicsMatrix: Matrix = { // Collect row-major topics val termTopicCounts: Array[(Int, TopicCounts)] = graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) => (index2term(termIndex), cnts) }.collect() // Convert to Matrix val brzTopics = BDM.zeros[Double](vocabSize, k) termTopicCounts.foreach { case (term, cnts) => var j = 0 while (j < k) { brzTopics(term, j) = cnts(j) j += 1 } } Matrices.fromBreeze(brzTopics) } // 每个主题的词典权重排序。格式(词汇id(依照权重由大到小排序),词在主题上的权重) override def describeTopics(maxTermsPerTopic: Int): Array[(Array[Int], Array[Double])] = { val numTopics = k // Note: N_k is not needed to find the top terms, but it is needed to normalize weights // to a distribution over terms. val N_k: TopicCounts = globalTopicTotals val topicsInQueues: Array[BoundedPriorityQueue[(Double, Int)]] = graph.vertices.filter(isTermVertex) .mapPartitions { termVertices => // For this partition, collect the most common terms for each topic in queues: // queues(topic) = queue of (term weight, term index). // Term weights are N_{wk} / N_k. val queues = Array.fill(numTopics)(new BoundedPriorityQueue[(Double, Int)](maxTermsPerTopic)) for ((termId, n_wk) <- termVertices) { var topic = 0 while (topic < numTopics) { queues(topic) += (n_wk(topic) / N_k(topic) -> index2term(termId.toInt)) topic += 1 } } Iterator(queues) }.reduce { (q1, q2) => q1.zip(q2).foreach { case (a, b) => a ++= b} q1 } topicsInQueues.map { q => val (termWeights, terms) = q.toArray.sortBy(-_._1).unzip (terms.toArray, termWeights.toArray) } } // TODO // override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

/** * Log likelihood of the observed tokens in the training set, * given the current parameter estimates: * log P(docs | topics, topic distributions for docs, alpha, eta) * * Note: * - This excludes the prior; for that, use [[logPrior]]. * - Even with [[logPrior]], this is NOT the same as the data log likelihood given the * hyperparameters. */ // 对数似然log P(docs | topics, topic distributions for docs, alpha, eta) lazy val logLikelihood: Double = { val eta = topicConcentration val alpha = docConcentration assert(eta > 1.0) assert(alpha > 1.0) val N_k = globalTopicTotals val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) // Edges: Compute token log probability from phi_{wk}, theta_{kj}. val sendMsg: EdgeContext[TopicCounts, TokenCount, Double] => Unit = (edgeContext) => { val N_wj = edgeContext.attr val smoothed_N_wk: TopicCounts = edgeContext.dstAttr + (eta - 1.0) val smoothed_N_kj: TopicCounts = edgeContext.srcAttr + (alpha - 1.0) val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) val tokenLogLikelihood = N_wj * math.log(phi_wk.dot(theta_kj)) edgeContext.sendToDst(tokenLogLikelihood) } graph.aggregateMessages[Double](sendMsg, _ + _) .map(_._2).fold(0.0)(_ + _) } /** * Log probability of the current parameter estimate: * log P(topics, topic distributions for docs | alpha, eta) */ // 对数概率log P(topics, topic distributions for docs | alpha, eta) lazy val logPrior: Double = { val eta = topicConcentration val alpha = docConcentration // Term vertices: Compute phi_{wk}. Use to compute prior log probability. // Doc vertex: Compute theta_{kj}. Use to compute prior log probability. val N_k = globalTopicTotals val smoothed_N_k: TopicCounts = N_k + (vocabSize * (eta - 1.0)) val seqOp: (Double, (VertexId, TopicCounts)) => Double = { case (sumPrior: Double, vertex: (VertexId, TopicCounts)) => if (isTermVertex(vertex)) { val N_wk = vertex._2 val smoothed_N_wk: TopicCounts = N_wk + (eta - 1.0) val phi_wk: TopicCounts = smoothed_N_wk :/ smoothed_N_k (eta - 1.0) * brzSum(phi_wk.map(math.log)) } else { val N_kj = vertex._2 val smoothed_N_kj: TopicCounts = N_kj + (alpha - 1.0) val theta_kj: TopicCounts = normalize(smoothed_N_kj, 1.0) (alpha - 1.0) * brzSum(theta_kj.map(math.log)) } } graph.vertices.aggregate(0.0)(seqOp, _ + _) } /** * For each document in the training set, return the distribution over topics for that document * ("theta_doc"). * * @return RDD of (document ID, topic distribution) pairs */ // 返回训练文档的主题分布概率 def topicDistributions: RDD[(Long, Vector)] = { graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) => (docID.toLong, Vectors.fromBreeze(normalize(topicCounts, 1.0))) } } /** Java-friendly version of [[topicDistributions]] */ def javaTopicDistributions: JavaPairRDD[java.lang.Long, Vector] = { JavaPairRDD.fromRDD(topicDistributions.asInstanceOf[RDD[(java.lang.Long, Vector)]]) } // TODO: // override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ??

? }

1.3 LDAOptimizer源代码解析 

sealed trait LDAOptimizer {

  /*
    DEVELOPERS NOTE:

    An LDAOptimizer contains an algorithm for LDA and performs the actual computation, which
    stores internal data structure (Graph or Matrix) and other parameters for the algorithm.
    The interface is isolated to improve the extensibility of LDA.
   */

  /**
   * Initializer for the optimizer. LDA passes the common parameters to the optimizer and
   * the internal structure can be initialized properly.
   */
  private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer

  private[clustering] def next(): LDAOptimizer

  private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel
}

/**
 * :: DeveloperApi ::
 *
 * Optimizer for EM algorithm which stores data + parameter graph, plus algorithm parameters.
 *
 * Currently, the underlying implementation uses Expectation-Maximization (EM), implemented
 * according to the Asuncion et al. (2009) paper referenced below.
 *
 * References:
 *  - Original LDA paper (journal version):
 *    Blei, Ng, and Jordan.  "Latent Dirichlet Allocation."  JMLR, 2003.
 *     - This class implements their "smoothed" LDA model.
 *  - Paper which clearly explains several algorithms, including EM:
 *    Asuncion, Welling, Smyth, and Teh.
 *    "On Smoothing and Inference for Topic Models."  UAI, 2009.
 *
 */
@DeveloperApi
final class EMLDAOptimizer extends LDAOptimizer {

  import LDA._

  /**
   * The following fields will only be initialized through the initialize() method
   */
  private[clustering] var graph: Graph[TopicCounts, TokenCount] = null
  private[clustering] var k: Int = 0
  private[clustering] var vocabSize: Int = 0
  private[clustering] var docConcentration: Double = 0
  private[clustering] var topicConcentration: Double = 0
  private[clustering] var checkpointInterval: Int = 10
  private var graphCheckpointer: PeriodicGraphCheckpointer[TopicCounts, TokenCount] = null

  /**
   * Compute bipartite term/doc graph.
   */
  override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {

    val docConcentration = lda.getDocConcentration
    val topicConcentration = lda.getTopicConcentration
    val k = lda.getK

    // Note: The restriction > 1.0 may be relaxed in the future (allowing sparse solutions),
    // but values in (0,1) are not yet supported.
    require(docConcentration > 1.0 || docConcentration == -1.0, s"LDA docConcentration must be" +
      s" > 1.0 (or -1 for auto) for EM Optimizer, but was set to $docConcentration")
    require(topicConcentration > 1.0 || topicConcentration == -1.0, s"LDA topicConcentration " +
      s"must be > 1.0 (or -1 for auto) for EM Optimizer, but was set to $topicConcentration")

    this.docConcentration = if (docConcentration == -1) (50.0 / k) + 1.0 else docConcentration
    this.topicConcentration = if (topicConcentration == -1) 1.1 else topicConcentration
    val randomSeed = lda.getSeed

    // For each document, create an edge (Document -> Term) for each unique term in the document.
    // 创建文章与词汇的edge。格式:(文章id,词汇id,词频)。对每一个词向量的文档依照此格式创建edge,当中过滤词频为0的词汇。
    val edges: RDD[Edge[TokenCount]] = docs.flatMap { case (docID: Long, termCounts: Vector) =>
      // Add edges for terms with non-zero counts.
      termCounts.toBreeze.activeIterator.filter(_._2 != 0.0).map { case (term, cnt) =>
        Edge(docID, term2index(term), cnt)
      }
    }

    // Create vertices.
    // Initially, we use random soft assignments of tokens to topics (random gamma).
    // edge.attr 是边的属性,edge.srcId 是边的起点,edge.dstId 是边的终点
    // gamma 是生成主题分布的随机向量
    // 返回格式:(顶点。主题分布随机向量)
    // 每一个词节点存储一些权重值。表示这个词语和哪个主题相关。每篇文章节点存储当前文章讨论主题的预计。

val docTermVertices: RDD[(VertexId, TopicCounts)] = { val verticesTMP: RDD[(VertexId, TopicCounts)] = edges.mapPartitionsWithIndex { case (partIndex, partEdges) => val random = new Random(partIndex + randomSeed) partEdges.flatMap { edge => val gamma = normalize(BDV.fill[Double](k)(random.nextDouble()), 1.0) val sum = gamma * edge.attr Seq((edge.srcId, sum), (edge.dstId, sum)) } } verticesTMP.reduceByKey(_ + _) } // Partition such that edges are grouped by document // 创建graph,依据上面生成的顶点docTermVertices和边edges // partitionBy图的分布式存储採用点切割模式 // computeGlobalTopicTotals。计算全部词的主题分布概率和 this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D) this.k = k this.vocabSize = docs.take(1).head._2.size this.checkpointInterval = lda.getCheckpointInterval this.graphCheckpointer = new PeriodicGraphCheckpointer[TopicCounts, TokenCount](graph, checkpointInterval) this.globalTopicTotals = computeGlobalTopicTotals() this } override private[clustering] def next(): EMLDAOptimizer = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") val eta = topicConcentration val W = vocabSize val alpha = docConcentration val N_k = globalTopicTotals // sendMsg: 发消息函数 // computePTopic计算主题分布 // sendToSrc sendToDst 是发送信息到源和目的属性 // 计算N_{wj} gamma_{wjk} // N_{wj} 词汇w在文档中的频次,gamma_{wjk} 词汇w在文档j中分配给主题k的概率 val sendMsg: EdgeContext[TopicCounts, TokenCount, (Boolean, TopicCounts)] => Unit = (edgeContext) => { // Compute N_{wj} gamma_{wjk} // attr边属性,srcAttr dstAttr 顶点属性 val N_wj = edgeContext.attr // E-STEP: Compute gamma_{wjk} (smoothed topic distributions), scaled by token count // N_{wj}. val scaledTopicDistribution: TopicCounts = computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha) *= N_wj edgeContext.sendToDst((false, scaledTopicDistribution)) edgeContext.sendToSrc((false, scaledTopicDistribution)) } // This is a hack to detect whether we could modify the values in-place. // TODO: Add zero/seqOp/combOp option to aggregateMessages. (SPARK-5438) // mergeMsg:合并消息函数 // 用于Map阶段,每一个edge分区中每一个点收到的消息合并,以及reduce阶段。合并不同分区的消息。

合并vertexId同样的消息。 val mergeMsg: ((Boolean, TopicCounts), (Boolean, TopicCounts)) => (Boolean, TopicCounts) = (m0, m1) => { val sum = if (m0._1) { m0._2 += m1._2 } else if (m1._1) { m1._2 += m0._2 } else { m0._2 + m1._2 } (true, sum) } // M-STEP: Aggregation computes new N_{kj}, N_{wk} counts. // 每一个节点通过收集邻居数据来更新主题权重数据 val docTopicDistributions: VertexRDD[TopicCounts] = graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg) .mapValues(_._2) // Update the vertex descriptors with the new counts. // 依据最新顶点数据更新图 val newGraph = GraphImpl.fromExistingRDDs(docTopicDistributions, graph.edges) graph = newGraph graphCheckpointer.updateGraph(newGraph) globalTopicTotals = computeGlobalTopicTotals() this } /** * Aggregate distributions over topics from all term vertices. * * Note: This executes an action on the graph RDDs. */ private[clustering] var globalTopicTotals: TopicCounts = null // computeGlobalTopicTotals,计算全部词的主题分布概率和 private def computeGlobalTopicTotals(): TopicCounts = { val numTopics = k graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _) } // 生成LDA模型 override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = { require(graph != null, "graph is null, EMLDAOptimizer not initialized.") this.graphCheckpointer.deleteAllCheckpoints() new DistributedLDAModel(this, iterationTimes) } }<span style="font-family: 'Microsoft YaHei'; background-color: rgb(255, 255, 255);"> </span>

转载请注明出处:

http://blog.csdn.net/sunbow0

原文地址:https://www.cnblogs.com/zhchoutai/p/7091437.html