StructuredStream StateStore机制

ref: https://jaceklaskowski.gitbooks.io/spark-structured-streaming/
StruncturedStream的statefule实现基于StateStore,能够记忆历史的结果,从而形成unbounded流式计算。其内部实际上是将历史的统计结果存在StateStore(目前是基于HDFS存储数据)。每次计算时,会执行StateStoreRestore->Agg->StateStoreSave:

stateful机制以来与StateStoreRDD

logical plan逻辑:
image.png

StateStoreRestore/Save都是基于StateStoreRDD

image.png

StateStoreRDD基于StateStoreCoordinator获取state的location,作为preferred location.
数据来源包含StateStore的历史结果和新batch的RDD数据。

StateStoreRDD is an RDD for executing storeUpdateFunction with StateStore (and data from partitions of a new batch RDD).

最终StateStoreRDD将merge历史的state和新的batch data:

// StateStoreRDD#compute
override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
    var store: StateStore = null
    val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
    store = StateStore.get(
      storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value) // 获取Store
    val inputIter = dataRDD.iterator(partition, ctxt)  // 新batch的数据
    storeUpdateFunction(store, inputIter)  // 结合计算,Restore和Save的逻辑不同
  }
storeUpdateFunction of StateStoreRestore

Restore时的merge逻辑是将历史state和新batch的数据,按相同的key合并在一起,主要调用store#get(key)

{ case (store, iter) =>
        val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
        iter.flatMap { row =>
          val key = getKey(row)
          val savedState = store.get(key)
          numOutputRows += 1
          row +: savedState.toSeq
        }
storeUpdateFunction of StateStoreSave (以outMode=complete为例),主要调用 store#put(key,value)
{ (store, iter) =>
        val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
        ...
        outputMode match {
          // Update and output all rows in the StateStore.
          case Some(Complete) =>
            while (iter.hasNext) {
              val row = iter.next().asInstanceOf[UnsafeRow]
              val key = getKey(row)
              store.put(key.copy(), row.copy())
              numUpdatedStateRows += 1
            }
            store.commit()
            numTotalStateRows += store.numKeys()
            store.iterator().map { case (k, v) =>
              numOutputRows += 1
              v.asInstanceOf[InternalRow]
            }
...

StateStore (HDFSBackedStateStore)

简单理解一下StateStore。直观上,在DStream框架下如果要实现stateful,我们也会把历史的state用一个RDD存下来,每次新的数据计算完成后再跟历史RDD融合(通过checkpoint避免超长lineage)。这个思路是完全正确并且和StructuredStream的思路相似。

  1. key/value的schema
  2. preferred location优化

StateStoreRDD是逻辑上的RDD,因为它的数据实际上来源于history+new batch。

  • 它的partition是new batch的partition。
override protected def getPartitions: Array[Partition] = dataRDD.partitions
  • preferredLocation选择
    p1 -> 计算其对应的历史state store的storeId->从storeCoor获取该storeId的location。(注:可有可无)
    StoreId 由( checkpointLocation, operationId, partition.index)唯一确定。
override def getPreferredLocations(partition: Partition): Seq[String] = {
    val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
    storeCoordinator.flatMap(_.getLocation(storeId)).toSeq
  }
  • compute过程
override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = {
    var store: StateStore = null
    val storeId = StateStoreId(checkpointLocation, operatorId, partition.index)
    store = StateStore.get(
      storeId, keySchema, valueSchema, storeVersion, storeConf, confBroadcast.value.value)
    val inputIter = dataRDD.iterator(partition, ctxt)
    storeUpdateFunction(store, inputIter)
  }

※ 根据storeId,key/valueSchema, version等信息获取store (StateStore#get)

  def get(
      storeId: StateStoreId,
      keySchema: StructType,
      valueSchema: StructType,
      version: Long,
      storeConf: StateStoreConf,
      hadoopConf: Configuration): StateStore = {
    require(version >= 0)
    val storeProvider = loadedProviders.synchronized {
      startMaintenanceIfNeeded()
      val provider = loadedProviders.getOrElseUpdate(
        storeId,
        new HDFSBackedStateStoreProvider(storeId, keySchema, valueSchema, storeConf, hadoopConf))
      reportActiveStoreInstance(storeId)
      provider
    }
    storeProvider.getStore(version)
  }

→ storeProvider.getStore(version)
基于type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
loadMap从HDFS中将数据读入到Map中。

  override def getStore(version: Long): StateStore = synchronized {
    require(version >= 0, "Version cannot be less than 0")
    val newMap = new MapType()
    if (version > 0) {
      newMap.putAll(loadMap(version))
    }
    val store = new HDFSBackedStateStore(version, newMap)
    logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
    store
  }
原文地址:https://www.cnblogs.com/luweiseu/p/7735821.html