用Spark做key排序

#直接调用sortByKey()函数就可以做到
from pyspark import SparkContext

sc = SparkContext('local','Sort')
list = ["7","4","8","2","5"]
textFile = sc.parallelize(list)
textRDD =textFile.map(lambda word : (word, 1))
textRDD.sortByKey().sortByKey().foreach(print)
sortByKey:是对键值进行排序
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
Sorts this RDD, which is assumed to consist of (key, value) pairs.
# noqa

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey().first()
('1', 3)
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortByKey(True, 2).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
>>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

memory = self._memory_limit()
serializer = self._jrdd_deserializer
sortBy: 是自定义排序
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
"""
Sorts this RDD by the given keyfunc

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
"""
return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
 
 
问题:当创建的RDd是从本地读取时,sortByKey排序出错
实验数据:
      18
      2    
      89
      34
      67
      892
      34
      34
代码:
  
from pyspark import SparkContext

sc = SparkContext('local','Sort')
textFile = sc.textFile("file:///usr/local/spark/mycode/TestPackage/sort.txt")
textRDD =textFile.flatMap(lambda line : line.split(" ")).map(lambda word : (word, 1))
textRDD.sortByKey().foreach(print)
 
效果:
('18', 1)
('2', 1)
('34', 1)
('34', 1)
('34', 1)
('67', 1)
('89', 1)
('892', 1)
 
    
 
原文地址:https://www.cnblogs.com/SoftwareBuilding/p/9400552.html