spark入门(二)RDD基础操作

1 简述

spark中的RDD是一个分布式的元素集合。

在spark中,对数据的所有操作不外乎创建RDD,转化RDD以及调用RDD操作进行求值,而这些操作,spark会自动将RDD中的数据分发到集群上,并将操作并行执行。

2 创建 RDD

创建RDD分两种:读取外部数据集,在程序中对一个集合进行并行化。

2.1 读取外部数据集:

常用的方式是读取外部的数据集,比如文本文件读入为一个RDD:

scalac版:

val lines = sc.textFile("D:workspacescala_workspacedemo.txt")

java版:

JavaRDD<String> lines = sc.textFile("D:workspacejava_workspacedemo.txt");

2.2 在程序中对集合进行并行化:

最简单的方法是把集合传给SparkContext中的parallelize()方法:

scalac版:

val lines = sc.parallelize(["a", "b"])

 java版:

JavaRDD<String> lines = sc.parallelize(Arrays.asList("a", "b"));

3 转化操作

RDD的转化操作是返回新的RDD的操作。

假设有一个日文件log.txt,希望筛选出包含error的记录。使用rdd的filter()方法如下操作:

scalac版:

val rdd = sc.textFile("log.txt")
val errorRDD = rdd .filter(line => line.contains("error"))

 java版:

JavaRDD<String> rdd = sc.textFile("log.txt");
JavaRDD<String> errorRDD = rdd.filter (
    new Function<String, Boolean>() {
        public Boolean call(String str) {
            return str.contains("errors");
        } 
    }   
)        

注意:filter()方法不会改变已有的rdd中的数据。

4 向spark 传递函数

4.1 scala

在Scala中,我们可以传递定义的内联函数,引用方法:

 1 class SearchFunctions(val query: String) {
 2   def isMatch(s: String): Boolean = {
 3     s.contains(query)
 4   }
 5   def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
 6     rdd.map(isMatch)
 7   }
 8   def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
 9     rdd.map(x => x.split(query))
10   }
11   def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
12     val query_ = this.query
13     rdd.map(x => x.split(query_))
14   }
15 }

4.2 java

 1 class Contains implements Function<String, Boolean>() {
 2     private String query;
 3 
 4     public Contains(String query) { this.query = query; }
 5 
 6     public Boolean call(String x) { return x.contains(query); }
 7 }
 8 
 9
10 RDD<String> errors = lines.filter(new Contains("error"));

 

原文地址:https://www.cnblogs.com/zcjcsl/p/7923330.html