Spark词频统计,求TOP值,文件排序,二次排序

RDD操作

词频统计

line = sc.textFile("file:///usr/local/spark/word.txt")
wordCount = lines.flatMap(lambda line: line.split(" ").map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b)
print(wordCount.collect())
//[('good',1),('Spark',2),('hadoop',1)]
//.collect()以数组形式将结果集缓存磁盘

Pair RDD操作

计算每种图书的每天平均销量

rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]).collect()
//输出结果[('hadoop',5.0),('spark',4.0)]

求TOP值

file文件有4列,求第三列前N个TOP值。

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("TOP")
sc = SparkContext(conf = conf)
lines = sc.textFile("file:///usr/local/spark/mycode/rdd/file")
//strip()消去空格,result1把数据没有4项的过滤掉
result1 = lines.filter(lambda line:(len(line.strip()) > 0) and (len(line.split(","))==4))
//result2取第三列的数据
result2 = result1.map(lambda x:x.split(",")[2])
//result3转化为键值对(x,""),转化成键值对才能排序
result3 = result2.map(lambda x:(int(x),""))
//result4分成1个分区,否则是各自分区排序
result4 = result3.repartition(1)
//result5对key排序
result5 = result4.sortByKey(False)
//result6取键值对的key
result6 = result5.map(lambda x:x[0])
//result7取前5个
result7 = result6.take(5)
for a in result7:
   print(a)

文件排序

读取文件中所有整数进行排序

from pyspark import SparkConf,SparkContext

index = 0
def getindex():
  global index
  index+=1
  return index
  
def main():
  conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
  sc = SparkContext(conf = conf)
  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/filesor/file*.txt")
  index = 0
  //过滤空行
  result1 = lines.filter(lambda line:(len(line.strip()) > 0))
  result2 = result1.map(lambda x:(int(x.strip()),""))
  result3 = result2.repartition(1)
  result4 = result3.sortByKey(True)
  result5 = result4.map(lambda x:x[0])
  result6 = result5.map(lambda x:(getindex(),x))
  result6.foreach(print)
  result6.saveAsTextFile("file:///usr/local/spark/mycode/rdd/filesort/sortresult")
if__name__=='__main__':
  main()

二次排序

对于一个给定的文件(有两列整数),请对数据进行排序,首先根据第一列数据降序排序,如果第一列数据相等,则根据第二列数据降序排序

  • 按照Ordered和Serializable接口实现自定义排序的key
  • 将要进行二次排序的文件加载进来生成<key,value>类型的RDD
  • 使用sortByKey基于自定义的key进行二次排序
  • 去掉排序的key只保留排序的结果
from pyspark import SparkConf,SparkContext
from operator import gt

class SecondarySortKey():
  def__init__(self,k):
    self.column1 = k[0]
    self.column2 = k[1]
  
  def__gt__(self, other):
    if other.column1 ==self.column1:
      return gt(self.column2,other.column2)
    else:
      return gt(self.column1,other.column1)

def main():
  conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
  sc = SparkConf(conf = conf)
  file = "file:///usr/local/spark/mycode/secondarysort/file.txt"
  rdd1 = sc.textFile(file)
  rdd2 = rdd1.filter(lambda x:len(x.strip()>0))
  rdd3 = rdd2.map(lambda x:((int(x.split(" ")[0]),int(x.split(" ")[1])),x))
  rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]),x[1]))
  rdd5 = rdd4.sortByKey(False)
  rdd6 = rdd5.map(lambda x:x[1])
  rdd6.foreach(print)
  
if__name__=='__main__':
  main()
所有案例代码教学均来自于厦门大学数据库实验室。
原文地址:https://www.cnblogs.com/chenshaowei/p/12435335.html