pyspark的排序

一、count

sql = """select video_id,count(video_id) as video_num from video_table group by video_id order by video_num desc"""
rdd = spark.sql(sql).rdd.map(lambda x: x["video_id"])
result = rdd.collect()

二、sortBy和sortByKey

from operator import add
sql = """select video_id from video_table """
rdd = spark.sql(sql).rdd.map(lambda x: (x["video_id"],1))..reduceByKey(add)
rdd1 = rdd.sortBy(lambda x: x[1], ascending=False)
rdd2 = rdd.sortByKey(lambda x: x, ascending=False)
result = rdd1.collect() # rdd2.collect()

1、sortBy如何实现全局排序

sortBy实际上调用sortByKey

def sortBy(self, keyfunc, ascending=True, numPartitions=None):
      return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()

2、sortBy的实现过程:

Stage 0:Sample。创建 RangePartitioner,先对输入的数据的key做sampling来估算key的分布情况,然后按指定的排序切分出range,尽可能让每个partition对应的range里的key的数量均匀。计算出来的 rangeBounds 是一个长为 numPartitions - 1 的list,记录头 numPartitions - 1 个partition对应的range的上界;最后一个partition的边界就隐含在“剩余”当中。

 rddSize = self.count() # 统计rdd中包含的元素个数,假设rddSize=10000
 if not rddSize:
    return self  # empty RDD
maxSampleSize = numPartitions * 20.0  # 假设有4个分区,maxSampleSize=80
fraction = min(maxSampleSize / max(rddSize, 1), 1.0) # fraction=8/1000,
samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect() # 采样 8/1000,根据采样出的数据来估算key的分布情况。
samples = sorted(samples, key=keyfunc) # 对采样得到的rdd collect之后得到的列表,调用python的sorted方法,完成从小到大排序,得到排好序的列表。
# 得到numPartition-1=3个边界列表。
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
                  for i in range(0, numPartitions - 1)]
# partitionBy根据给定的3个边界进行分区,分区之后分区间的元素是排好序的。再调用mapPartitions,对每个分区的数据进行排序
def rangePartitioner(k):
    p = bisect.bisect_left(bounds, keyfunc(k))
    if ascending:
        return p
    else:
        return numPartitions - 1 - p

return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)

Stage 1:Shuffle Write。开始shuffle,在map side做shuffle write,根据前面计算出的rangeBounds来重新partition。
通过key值和区间边界进行比较,如果位于改区间,则分配到该区间对应的分区。
Shuffle write出的数据中,每个partition内的数据虽然尚未排序,但partition之间已经可以保证数据是按照partition index排序的了。
Stage 2:Shuffle Read。然后到reduce side,每个reducer再对拿到的本partition内的数据做排序。这样完成之后,partition之间的数据在map side就保证有排序,而每个partition内的数据在reduce side也保证有排序,就达到了全局排序的效果。如果在 sortByKey() 后面跟一个 collect() 调用,则它会按照partition index的顺序获取结果数据,最后再把这些数组合并起来,就在本地得到了全局排序后的大数组。

三、调用python方法

from collections import Counter
def category_trans(x):
    """
    统计每个分类下面视频出现的次数
    :param x:
    :return:
    """
    tag = x[0]
    videos = x[1]
    result = Counter(videos)
    r = sorted(result.items(), key=lambda item: item[1],reverse=True)
    return tag,[item[0] for item in r]
rdd = rdd.map(lambda x:(x["tag"],[x["video_id"]])) # 此时rdd内的数据为[(tag1:[video_1]),(tag1,[video_2]),(tag2,[video_1]),...]
video_rdd = rdd.reduceByKey(lambda x,y: x+y) # [(tag1,[v1,v2,...]),...]
t2v = video_rdd.map(lambda x: category_trans(x))  # [(tag1,[排好序的列表]),...]
result = t2v.collectAsMap()

四、自定义类

将rdd中元素转换为自定义类的实例

class MySort():
    """
    自定义类的__lt__()方法。python的类中已经自带了lt,eq,ge,gt...等方法
    """
    def __init__(self,num):
        self.num = num
    def __lt__(self,other):
        return self.num<other.num

    def __repr__(self):
        return str(self.num)

rdd = sc.parallelize([(1, 1),(1, 2), (-1, 1), (-1, -0.5)])
rdd = rdd.map(MySort)
rdd = rdd.sortBy(lambda x: x,ascending=False)
rdd = rdd.foreach(lambda x: print(x))

参考资料

1、(Spark排序的原理? - RednaxelaFX的回答 - 知乎

https://www.zhihu.com/question/34771277/answer/187001059)

原文地址:https://www.cnblogs.com/leimu/p/15543606.html