Spark DataFrame 添加索引列

若DataFrame为简单的二维表,则可以借助RDD的zipWithIndex实现索引列添加。

scala> val df = spark.createDataFrame(Seq(("ming",20,1234),("hong",19,1235),("zhi",21,1236)))

scala> df.show
+----+---+----+
|  _1| _2|  _3|
+----+---+----+
|ming| 20|1234|
|hong| 19|1235|
| zhi| 21|1236|
+----+---+----+


scala> val rdd = df.rdd.zipWithIndex()
rdd: org.apache.spark.rdd.RDD[(org.apache.spark.sql.Row, Long)] = ZippedWithIndexRDD[5] at zipWithIndex at <console>:25

scala> rdd.collect.foreach(println)
([ming,20,1234],0)
([hong,19,1235],1)
([zhi,21,1236],2)

若DataFrame来源于JSON格式数据,直接通过rdd.zipWithIndex实现索引列添加,会报如下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.Row
- field (class: "org.apache.spark.sql.Row", name: "_1")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:650)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
        at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
        at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
        at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:644)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1$$anonfun$8.apply(ScalaReflection.scala:632)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:632)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor$1.apply(ScalaReflection.scala:452)
        at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
        at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:906)
        at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:46)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:452)
        at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:441)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
        at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
        at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
        at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
        at com.ipinyou.mip.ma.worker.ABTestProcessor$.getAudienceByNumber(ABTestProcessor.scala:69)
        at com.ipinyou.mip.ma.worker.ABTestProcessor$.execute(ABTestProcessor.scala:47)
        at com.ipinyou.mip.ma.worker.ABTestProcessor$.main(ABTestProcessor.scala:92)
        at com.ipinyou.mip.ma.worker.ABTestProcessor.main(ABTestProcessor.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

解决办法是,选择DataFrame中数据维度单一的列,转成rdd后使用zipWithIndex,最后将此DataFrame与原始DataFrame做join,这个操作的局限是所选择的列必须不能存在重复值。

audience数据示例:

val indexColName = "index"
val cdpId = audience.select(cdpIdColName).rdd.map(_.getString(0)).zipWithIndex().toDF(cdpIdColName, indexColName)
val audienceWithIndex = audience.join(cdpId, Seq(cdpIdColName), "left")
原文地址:https://www.cnblogs.com/144823836yj/p/14120082.html