Spark SQL window functions遇到的问题

在使用org.apache.spark.sql.functions中的Window functions过程中,遇到了几个棘手的问题,经过不断搜寻和多次试验,终于找到了解决方法。

首先看例子:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SaveMode, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object WindowQueryTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("WndFnc_demo").setMaster("local"))
    val hiveContext = new HiveContext(sc)

    val data = Seq(("A", 4), ("C", 1), ("D", 1), ("B", 2), ("B", 2), ("D", 4), ("A", 1), ("B", 4))
    val withRowNumbers: Seq[(String, Int, Int)] = data.zipWithIndex.map(e => (e._1._1, e._1._2, e._2))

    val rdd: RDD[Row] = sc.parallelize(withRowNumbers).map(triplet => Row(triplet._1, triplet._2, triplet._3))

    hiveContext.sql("DROP TABLE IF EXISTS delme")

    hiveContext.sql( """CREATE  TABLE `delme`(
                      `key`  string,
                      `val`  int,
                      `ord`  int)""")
    val schema = StructType(Seq(StructField("key", StringType),
      StructField("val", IntegerType), StructField("ord", IntegerType)))
    hiveContext.createDataFrame(rdd, schema).write.mode(SaveMode.Append).saveAsTable("delme")

    val qRes = hiveContext.sql("""SELECT key, val
                                            ,MAX(val)OVER(PARTITION BY key) mx
                                            ,MIN(val)OVER(PARTITION BY key) mn
                                            ,row_number() OVER(ORDER BY ord desc) revord
                                            ,rank() OVER(ORDER BY val) rnk
                                        FROM delme""")
    qRes.collect().foreach(println)
  }
}

一、初始化必需使用HiveContext

如果初始化的是SQLContext实例:

val sqlContext = new SQLContext(sc)

则会报错,提示必需使用HiveContext:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
……

HiveContext继承自SQLContext。

class HiveContext(sc : org.apache.spark.SparkContext) extends org.apache.spark.sql.SQLContext with org.apache.spark.Logging

二、外部库需要添加spark/lib中的三个jar文件依赖

External Libraies必需包含以下三个jar文件,datanucleus-api-jdo, datanucleus-core和datanucleus-rdbms:

image

工程编译时将自动生成metastore_db文件夹和derby.log文件。

image

否则,出现如下错误信息:

16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught.
javax.jdo.JDOFatalInternalException: Unexpected exception caught.
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
16/01/18 15:40:07 WARN HiveMetaStore: Retrying creating default database after error: Unexpected exception caught.
javax.jdo.JDOFatalInternalException: Unexpected exception caught.
……
16/01/18 15:40:07 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
16/01/18 15:40:07 INFO ObjectStore: ObjectStore, initialize called
16/01/18 15:40:07 WARN Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
……
Exception in thread "main" java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
……

这三个文件存在于spark/lib中。

三、运行配置JVM参数JAVA_OPTS (FATAL!)

看起来Everything is OK。编译执行程序,却发生异常退出,而且只在最后报出main进程异常,没有任何ERROR,很难发现到底是什么原因。

……
Exception in thread "main" 
Process finished with exit code 1

多次执行,会出现如下异常信息,重点在PermGen Space(持久加载区空间大小)。

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
    at org.apache.spark.sql.hive.client.IsolatedClientLoader.<init>(IsolatedClientLoader.scala:179)
    at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
    at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
    at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
    at org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:174)
    at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:177)
    at WindowQueryTest$.main(WindowQueryTest.scala:14)
    at WindowQueryTest.main(WindowQueryTest.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.OutOfMemoryError: PermGen space
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1236)
    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
    at org.apache.hadoop.hive.ql.metadata.Hive.<clinit>(Hive.java:166)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
    at org.apache.spark.sql.hive.client.ClientWrapper.<init>(ClientWrapper.scala:171)
    ... 18 more

Process finished with exit code 1

再次编译执行,还可能出现更长的异常信息,错误可能会变化,但万变不离其宗,症结依旧是PermGen Space的大小!

解决方法:在Run Configuration中添加JVM options:-server -Xms512M -Xmx1024M -XX:PermSize=256M -XX:MaxNewSize=512M -XX:MaxPermSize=512M

image

各个参数可以根据具体机器配置调整。

四、WindowSpec指定窗口设置

再看这个列子:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object WindowFunctions {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Window Functions").setMaster("local")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)

    import hiveContext.implicits._

    val l = (1997, 1) :: (1997, 4) :: (1998, 2) :: (1998, 3) :: (1999, 9) :: Nil
    val df = sc.parallelize(l).toDF("k", "v")
    val w = Window.orderBy($"k")
    val df1 = df.withColumn("No", rowNumber().over(w))
    val rowW = w.rowsBetween(-2, 0)
    val rangeW = w.rangeBetween(-1, 0)
    df1.withColumn("row", avg($"v").over(rowW)).withColumn("range", avg($"v").over(rangeW)).show
    sc.stop()
  }
}

得到结果:

image

 

org.apache.spark.sql.expressions.Window定义WindowSpec,并指定分组或者排序。

@org.apache.spark.annotation.Experimental
object Window extends scala.AnyRef {
  @scala.annotation.varargs
  def partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
}

定义的WindowSpec可以调用rowsBetween或者rangeBetween设置偏移量,定义窗口的区间范围;甚至也可以重置分组和排序。

@org.apache.spark.annotation.Experimental
class WindowSpec private[sql] (partitionSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.Expression], orderSpec : scala.Seq[org.apache.spark.sql.catalyst.expressions.SortOrder], frame : org.apache.spark.sql.catalyst.expressions.WindowFrame) extends scala.AnyRef {
  @scala.annotation.varargs
  def partitionBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def partitionBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def orderBy(colName : scala.Predef.String, colNames : scala.Predef.String*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  @scala.annotation.varargs
  def orderBy(cols : org.apache.spark.sql.Column*) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  def rowsBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  def rangeBetween(start : scala.Long, end : scala.Long) : org.apache.spark.sql.expressions.WindowSpec = { /* compiled code */ }
  private[sql] def withAggregate(aggregate : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = { /* compiled code */ }
}

最后通过具体的窗口函数计算得到需要的列。

References:

[1] https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

[2] http://www.cnblogs.com/mingforyou/archive/2012/03/03/2378143.html

[3] http://sonra.io/window-functions-aka-analytic-functions-in-spark/

END

原文地址:https://www.cnblogs.com/kevingu/p/5140242.html