spark 的 wordcount

记录spark的Wordcount小程序:
前提:hdfs已经打开
 

 
创建一个name为wc.input的文件,上传到hdfs中的/user/hadoop/spark/中,内容如上图
 
 
[root@spark00 hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -put wc.input /user/hadoop/spark/            上传
 
[root@spark00 hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -ls    /user/hadoop/spark/                        查看文件
 
[root@spark00 hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -text /uesr/hadoop/spark/wc.input            查看文件内容
 
[root@spark00 spark-1.3.1]# bin/spark-shell                                                             打开spark的客户端
 
 
 
scala> val rdd=sc.textFile("hdfs://spark00:8020/user/hadoop/spark/wc.input")        读取dfs中的文件wc.input
 
val wordcount = rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)            进行mapreduce
 
wordcount.collect                    查看所有
 
rdd.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect
 
sc.textFile("hdfs:spark00:8020/user/hadoop/spark/wc.input").flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).collect
 
sc.textFile(.....).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
 
 
 
排序:
val wordsort = wordcount.sortByKey(true)
 
val wordsort = wordcount.sortByKey(false)
 
 
wordsort.collect
 
 
RDD的认识:
 
在spark中,一个应用程序中包含多个job任务
在mapreduce中,一个job任务就是一个应用
 
RDD    的特点:
1》    分区   partitioned,split
 
2》    计算  compute
 
3》    依赖
 
 
rdd特点官网上的翻译及理解:
 
1,A list of partitions
            一系列的分片:比如64M一片,类似于Hadoop中的split
 
2,A function for computing each split 
            在每个分片上都有一个函数去迭代/执行/计算  它
 
3,A list of dependencies on other RDDs
            一系列的依赖:RDDa转换成RDDb,RDDb转换成RDDc,那么RDDc就依赖于RDDb,RDDb就依赖于RDDa
 
4,Optionally,a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)
                    对于key-value的RDD可指定一个partitioner,告诉它如何分片;常用的有hash,range
 
5,Optionally,a list of preferred location(s) to compute each split on (e.g.    block    locations    for    an    HDFS    file)
                       要运行的计算/执行最好在哪几个机器上运行,数据本地性
                    为什么会有哪几个呢?
        比如:hadoop默认有三个位置,或者spark    cache到内存是可能通过StorageLevel设置多个副本,所以一个partition可能返回多个最佳位置
                
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 





原文地址:https://www.cnblogs.com/xiaoxiao5ya/p/bc549f35cc5a5dc7265b85e6d9862438.html