spark学习四


5.sc.textFiles() 与 sc.wholeTextFiles() 的区别

sc.textFile()是将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式,文件的每一行 相当于 列表 的一个元素,因此可以在每个partition中用for i in data的形式遍历处理数据。
sc.wholeTextFiles()返回的是[(key, val), (key, val)...]的形式,其中key是文件路径,val是文件内容,每个文件作为一个记录!这说明这里的 val 将不再是 list 的方式为你将文件每行拆成一个 list的元素,
而是将整个文本的内容以字符串的形式读进来,也就是说val = '...line1... ...line2... '
这时需要你自己去拆分每行!而如果你还是用for i in val的形式来便利 val那么i得到的将是每个字符。
6.filter方法过滤集合中的元素

首先你需要给filter方法一个判断条件或者返回true/false的函数,这个判断条件(函数)的输入类型要与集合元素类型一致,
返回值是布尔型的。filter方法会对集合的每一个元素调用判断条件,当条件为true的时候则元素进入新的集合否则会被过滤掉。
你还需要使用一个变量来指向新的集合
过滤偶数:
val x=List.range(1,10)
x:List[Int]=List(1,2,3,4,5,6,7,8,9)
val evens =x.filter(_%2==0)
evens:List[Int]=List(2,4,6,8)
val evens=x.filterNot(_%2==0)
evens:List[Int]=List(1,3,5,7,9)
filter方法可以便利整个集合,但是其他方法只是遍历一部分元素
filter方法允许提供一个判断条件(函数),过滤集合元素
当判断逻辑复杂时,没办法一行写完,可以在filter内部使用多行的判断逻辑,也可以定义一个判断函数,filter(panduan)
也可以连续使用filter方法
io.Source.formFile(canFilename)
.toList
.filter(_.trim !="")
.filter(_.charAt(0) !='#')
7.mapValues(func)

功能:对键值对每个value都应用一个函数,但是,key不会发生变化。
val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.mapValues(_+1).collect.foreach(println)//对每个value进行+1
结果
(hadoop,2)
(spark,2)
(hive,2)
(spark,2)
 
 
完成了实验四RDD编程初级实践
2.编写独立应用程序实现数据去重
对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf


object exercise{

         def main(args: Array[String]) {  
              val conf = new SparkConf().setAppName("RemDup")
         val sc = new SparkContext(conf)
         val dataFile1 = "file:///usr/local/spark/mycode/exercise42/text1.txt,file:///usr/local/spark/mycode/exercise42/text2.txt"
        
         val data = sc.textFile(dataFile1,2)
         val da = data.distinct()
         da.foreach(println)

}
}

  

3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。下面是输入文件和输出文件的一个样例,供参考

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object wordcount{
    def main(args:Array[String]){
         val inputfile="file:///usr/local/spark/mycode/exercise43/data.txt"
         val conf=new SparkConf().setAppName("WordCount").setMaster("local[2]")
         val sc=new SparkContext(conf)
         val textFile=sc.textFile(inputfile)
         val wordCount=textFile.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect().foreach(println)
}

}

  

原文地址:https://www.cnblogs.com/zhang12345/p/12264520.html