【原创】大叔经验分享(23)spark sql插入表时的文件个数研究


spark sql执行insert overwrite table时,写到新表或者新分区的文件个数,有可能是200个,也有可能是任意个,为什么会有这种差别?

首先看一下spark sql执行insert overwrite table流程:

  • 1 创建临时目录,比如
    • .hive-staging_hive_2018-06-23_00-39-39_825_3122897139441535352-2312/-ext-10000
  • 2 将数据写到临时目录;
  • 3 执行loadTable或loadPartition将临时目录数据move到正是目录;

对应的代码为:

org.apache.spark.sql.hive.execution.InsertIntoHiveTable

case class InsertIntoHiveTable(
    table: MetastoreRelation,
    partition: Map[String, Option[String]],
    child: SparkPlan,
    overwrite: Boolean,
    ifNotExists: Boolean) extends UnaryExecNode {
...
  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
...
    val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
    val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
...
    @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
    saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
...

  private def saveAsHiveFile(
      rdd: RDD[InternalRow],
      valueClass: Class[_],
      fileSinkConf: FileSinkDesc,
      conf: SerializableJobConf,
      writerContainer: SparkHiveWriterContainer): Unit = {
    assert(valueClass != null, "Output value class not set")
    conf.value.setOutputValueClass(valueClass)

    val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
    assert(outputFileFormatClassName != null, "Output format class not set")
    conf.value.set("mapred.output.format.class", outputFileFormatClassName)

    FileOutputFormat.setOutputPath(
      conf.value,
      SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
    log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
    writerContainer.driverSideSetup()
    sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
    writerContainer.commitJob()
  }

下面先看第一步创建临时目录过程,即getExternalTmpPath

  val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")

  def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
    val extURI: URI = path.toUri
    if (extURI.getScheme == "viewfs") {
      getExtTmpPathRelTo(path.getParent, hadoopConf)
    } else {
      new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
    }
  }

  private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
  }

  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
    val inputPathUri: URI = inputPath.toUri
    val inputPathName: String = inputPathUri.getPath
    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
    val stagingPathName: String =
      if (inputPathName.indexOf(stagingDir) == -1) {
        new Path(inputPathName, stagingDir).toString
      } else {
        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
      }
    val dir: Path =
      fs.makeQualified(
        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
    try {
      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
        throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
      }
      fs.deleteOnExit(dir)
    } catch {
      case e: IOException =>
        throw new RuntimeException(
          "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

    }
    return dir
  }

  private def executionId: String = {
    val rand: Random = new Random
    val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
    "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
  }

临时目录组成为【.hive-staging(配置hive.exec.stagingdir)】_【hive(硬编码)】_【2018-06-23_00-39-39_825(时分秒)】_【3122897139441535352(随机串)】_【2312(taskId)】/-ext-10000(硬编码)

下面看写文件过程,即

sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)

org.apache.spark.SparkContext

  /**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }

可见是将rdd逐个分区执行写入操作,rdd有多少个分区就会写入多少个文件,rdd是通过child.execute返回的,即SparkPlan.execute,下面看SparkPlan

org.apache.spark.sql.execution.SparkPlan

  final def execute(): RDD[InternalRow] = executeQuery {
    doExecute()
  }

  protected def doExecute(): RDD[InternalRow]

doExecute是抽象方法,执行计划中的过程都对应到SparkPlan的子类,比如Project对应ProjectExec,SortMergeJoin对应SortMergeJoinExec;

SparkPlan是由SparkPlanner生成的,下面看SparkPlanner:

org.apache.spark.sql.execution.SparkPlanner

  def numPartitions: Int = conf.numShufflePartitions

这里直接取的是SQLConf.numShufflePartitions,下面看SQLConf:

org.apache.spark.sql.internal.SQLConf

  val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
    .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
    .intConf
    .createWithDefault(200)

  def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)

这里取的是配置spark.sql.shuffle.partitions,默认200;那么分区数量是怎样用到的?下面看BasicOperators:

org.apache.spark.sql.execution.SparkStrategies.BasicOperators

    def numPartitions: Int = self.numPartitions

    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
...
      case logical.RepartitionByExpression(expressions, child, nPartitions) =>
        exchange.ShuffleExchange(HashPartitioning(
          expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil

可见shuffle过程会根据numPartitions来创建HashPartitioning,如果sql执行过程需要shuffle(比如有join,group by等操作),那么默认会写200个文件;如果sql执行过程没有shuffle,那么会由HiveTableScan和Filter等来决定写多少个文件;

也可以通过执行计划来看,如果有shuffle过程,执行计划中通常有这么一步:

      :  +- Exchange(coordinator id: 371605426) hashpartitioning(id#60, 200), coordinator[target post-shuffle partition size: 67108864]

其中hashpartitioning(id#60, 200)中的200就是spark.sql.shuffle.partitions的默认值;

附ShuffleExchange过程:

org.apache.spark.sql.execution.exchange.ShuffleExchange

  def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = {
    ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
  }

  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
    // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
    if (cachedShuffleRDD == null) {
      cachedShuffleRDD = coordinator match {
        case Some(exchangeCoordinator) =>
          val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
          assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
          shuffleRDD
        case None =>
          val shuffleDependency = prepareShuffleDependency()
          preparePostShuffleRDD(shuffleDependency)
      }
    }
    cachedShuffleRDD
  }

  /**
   * Returns a [[ShuffleDependency]] that will partition rows of its child based on
   * the partitioning scheme defined in `newPartitioning`. Those partitions of
   * the returned ShuffleDependency will be the input of shuffle.
   */
  private[exchange] def prepareShuffleDependency()
    : ShuffleDependency[Int, InternalRow, InternalRow] = {
    ShuffleExchange.prepareShuffleDependency(
      child.execute(), child.output, newPartitioning, serializer)
  }

  /**
   * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
   * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional
   * partition start indices array. If this optional array is defined, the returned
   * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
   */
  private[exchange] def preparePostShuffleRDD(
      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
      specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
    // If an array of partition start indices is provided, we need to use this array
    // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
    // update the number of post-shuffle partitions.
    specifiedPartitionStartIndices.foreach { indices =>
      assert(newPartitioning.isInstanceOf[HashPartitioning])
      newPartitioning = UnknownPartitioning(indices.length)
    }
    new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
  }

  /**
   * Returns a [[ShuffleDependency]] that will partition rows of its child based on
   * the partitioning scheme defined in `newPartitioning`. Those partitions of
   * the returned ShuffleDependency will be the input of shuffle.
   */
  def prepareShuffleDependency(
      rdd: RDD[InternalRow],
      outputAttributes: Seq[Attribute],
      newPartitioning: Partitioning,
      serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = {
    val part: Partitioner = newPartitioning match {
      case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
      case HashPartitioning(_, n) =>
        new Partitioner {
          override def numPartitions: Int = n
          // For HashPartitioning, the partitioning key is already a valid partition ID, as we use
          // `HashPartitioning.partitionIdExpression` to produce partitioning key.
          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
        }
      case RangePartitioning(sortingExpressions, numPartitions) =>
        // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
        // partition bounds. To get accurate samples, we need to copy the mutable keys.
        val rddForSampling = rdd.mapPartitionsInternal { iter =>
          val mutablePair = new MutablePair[InternalRow, Null]()
          iter.map(row => mutablePair.update(row.copy(), null))
        }
        implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
        new RangePartitioner(numPartitions, rddForSampling, ascending = true)
      case SinglePartition =>
        new Partitioner {
          override def numPartitions: Int = 1
          override def getPartition(key: Any): Int = 0
        }
      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
      // TODO: Handle BroadcastPartitioning.
    }
    def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from a random partition.
        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
        (row: InternalRow) => {
          // The HashPartitioner will handle the `mod` by the number of partitions
          position += 1
          position
        }
      case h: HashPartitioning =>
        val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
        row => projection(row).getInt(0)
      case RangePartitioning(_, _) | SinglePartition => identity
      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
    }
    val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
      if (needToCopyObjectsBeforeShuffle(part, serializer)) {
        rdd.mapPartitionsInternal { iter =>
          val getPartitionKey = getPartitionKeyExtractor()
          iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
        }
      } else {
        rdd.mapPartitionsInternal { iter =>
          val getPartitionKey = getPartitionKeyExtractor()
          val mutablePair = new MutablePair[Int, InternalRow]()
          iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
        }
      }
    }

    // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds
    // are in the form of (partitionId, row) and every partitionId is in the expected range
    // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough.
    val dependency =
      new ShuffleDependency[Int, InternalRow, InternalRow](
        rddWithPartitionIds,
        new PartitionIdPassthrough(part.numPartitions),
        serializer)

    dependency
  }
原文地址:https://www.cnblogs.com/barneywill/p/10244446.html