spark DataFrame新增一列id列(单调递增,不重复)的几种方法

方案一:使用functions里面的monotonically_increasing_id(),生成单调递增,不保证连续,最大64bit,的一列.分区数不变。

import org.apache.spark.sql.functions._
val df1 = spark.range(0,1000).toDF("col1")
val df2 = df1.withColumn("id", monotonically_increasing_id())

注意:有多个分区的时候,每个分区里面是单调递增,step为1,分区之间不保证连续,如一共两个分区,0分区id是0-499,1分区id可能99000-99499,甚至更大,最大64bit的integer。

如果想要整体连续,可以先repartition(1),操作完后在repartition(n)

方案二:使用row_number().over(Windo.orderBy(ColName)),生成按某列排序后,新增单调递增,连续的一列。操作完后分区数变为1,id列从1开始。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
 
val df1 = spark.range(0,1000).toDF("col1")
 
println(df.rdd.getNumPartitions)
 
val w = Window.orderBy("col1")
val df2 = df.withColumn("id", row_number().over(w))
 
println(df2.rdd.getNumPartitions

方案三:将DataFrame转成RDD,使用RDD的方法zipWithIndex()/zipWithUniqueId(),分区数不变。

val df1: DataFrame = spark.range(0,1000000).toDF("col1")
 
 
//转成rdd并使用zipWithIndex()
var tempRDD: RDD[(Row, Long)] = df1.rdd.zipWithIndex()
//使用map
val record: RDD[Row] = tempRDD.map(x => {
      Row(x._1.get(0), x._2)
    })
 
val schema= new StructType().add("col1","long")
        .add("id","long")
spark.createDataFrame(record,schema).show()

zipWithIndex():首先基于分区索引排序,然后是每个分区中的项的排序。所以第一个分区中的第一项得到索引0,最后一个分区中的最后一项得到最大的索引。从0开始

zipWithUniqueId(): 每个分区是一个等差数列,等差为分区数n,每个分区的第一个值为分区id(id从0开始)。第k个分区:num*n+k,num在每个分区都是从0开始,step为1

3个分区,abc 0分区   def  1分区   ghi  2分区
col1         id
a         0*3+0=0
b         1*3+0=3
c         2*3+0=6
d         0*3+1=1
e         1*3+1=4
f         2*3+1=7
g         0*3+2=2
h         1*3+2=5
i         2*3+2=8

  


原文链接:https://blog.csdn.net/liaodaoluyun/article/details/86232639

原文地址:https://www.cnblogs.com/yyy-blog/p/12627856.html