spark自定义函数之——UDF使用详解及代码示例

前言

本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。

关于UDF:UDF:User Defined Function,用户自定义函数

创建测试用DataFrame

spark2.0创建DataFrame

// 构造测试数据,有两个字段、名字和年龄
val userData = Array(("A", 16), ("B", 21), ("B", 14), ("B", 18))

//创建测试df
val userDF = spark.createDataFrame(userData).toDF("name", "age")
userDF.show
+-----+---+
| name|age|
+-----+---+
| A | 16|
| B | 21|
| C | 14|
| D | 18|
+-----+---+
// 注册一张user表 
userDF.createOrReplaceTempView("user")

spark1.0创建DataFrame

 // 构造测试数据,有两个字段、名字和年龄
val userData = Array(("A", 16), ("B", 21), ("C", 14), ("D", 18))
//创建测试df
val userDF = sc.parallelize(userData).toDF("name", "age")
// 注册一张user表
 userDF.registerTempTable("user")

spark-sql中SQL中UDF用法

1. 通过匿名函数注册UDF

下面的UDF的功能是计算某列的长度,该列的类型为String

// Spark2.x:
spark.udf.register("strLen", (str: String) => str.length())

// Spark1.x:
sqlContext.udf.register("strLen", (str: String) => str.length())

// 仅以Spark2.x为例
spark.sql("select name,strLen(name) as name_len from user").show

2. 通过实名函数注册UDF

实名函数的注册有点不同,要在后面加 _(注意前面有个空格) 

// 定义一个实名函数

/**
 * 根据年龄大小返回是否成年 成年:true,未成年:false
*/
def isAdult(age: Int) = {
  if (age < 18) {
    false
  } else {
    true
  }
}

// 注册(仅以Spark2.x为例)
spark.udf.register("isAdult", isAdult _)

spark-sql中DataFrame中UDF用法

DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法

1. 注册

import org.apache.spark.sql.functions._
//方法一:注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //方法二:注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _)

2. 使用

可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能 
* 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究

// 通过withColumn添加列
userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show

//通过select添加列
userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show

+-----+---+--------+-------+
| name|age|name_len|isAdult|
+-----+---+--------+-------+
| A | 16| 3| false|
| B | 21| 5| true|
| C | 14| 4| false|
| D | 18| 3| true|
+-----+---+--------+-------+

withColumn和select的区别

可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。 

注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。

 参考:https://dongkelun.com/2018/08/02/sparkUDF/

原文地址:https://www.cnblogs.com/yyy-blog/p/10280657.html