pyspark 使用udf

官方文档:
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

一、概述

使用pyspark操作hive,可以很方便得使用udf。

二、实例

1. 建表并导入数据

from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession 
    .builder 
    .appName("Python Spark SQL Hive integration example") 
    .config("spark.sql.warehouse.dir", warehouse_location) 
    .enableHiveSupport() 
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH './kv1.txt' INTO TABLE src")

如果是在win10环境下运行,在传入数据之后,需要修改kv1.txt的权限,使其被程序可读。

2. 一些查询操作

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|

3. udf

需求:返回某个字段值的平方

1. 编写udf

def func_two(key):
    return key*key

2.注册udf

register包含三个参数:注册后的udf的函数名,原函数名,函数的返回值类型(需要其为pyspark.sql.types里的类型)

from pyspark.sql.types import IntegerType
spark.udf.register("func_two",func_two,IntegerType())
sc = spark.sparkContext
from pyspark.sql import HiveContext
# 新的hc
hc = HiveContext(sc)

3.使用udf

hc.sql("SELECT func_two(key) as key,value FROM src").collect()
原文地址:https://www.cnblogs.com/leimu/p/14846438.html