【Spark-core学习之二】 RDD和算子

环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依赖jdk1.8)
  spark-1.6


一、RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
RDD特性:
(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

创建RDD:

二、Spark任务执行原理

以上图中有四个机器节点,Driver和Worker是启动在节点上的进程,运行在JVM中的进程。
 Driver与集群节点之间有频繁的通信。
 Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了,会造成oom。
 Worker是Standalone资源调度框架里面资源管理的从节点,也是JVM进程。
 Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程。

三、Spark代码流程
1. 创建SparkConf对象
 可以设置Application name。
 可以设置运行模式及资源需求。
2. 创建SparkContext对象
3. 基于Spark的上下文创建一个RDD,对RDD进行处理。
4. 应用程序中要有Action类算子来触发Transformation类算子执行。
5. 关闭Spark上下文对象SparkContext。

四、Transformations转换算子
Transformations 转换算子是延迟执行,也叫懒加载执行。
filter:过滤符合条件的记录数,true保留,false过滤掉。
map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。
flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。
sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
reduceByKey:将相同的Key根据相应的逻辑进行处理。
sortByKey/sortBy:作用在K,V格式的RDD上,对key进行升序或者降序排序。

JAVA示例: 

package com.wjy.wc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

import com.google.common.base.Optional;

import scala.Tuple2;

public class TransFormation {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        
        //1、创建RDD 查看默认分区数,第二个参数指定分区数  未指定 默认1 分区数决定task数
        JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
        System.out.println("rdd默认分区数:"+rdd.partitions().size());
        
        //java创建K-V RDD使用parallelizePairs  不要使用parallelize
        List<Tuple2<String, String>> list1 = Arrays.asList(
                new Tuple2<String,String>("zhangsan","a"),
                new Tuple2<String,String>("lisi","b"),
                new Tuple2<String,String>("wangwu","c"),
                new Tuple2<String,String>("zhaoliu","d")
                );
        List<Tuple2<String, Integer>> list2 = Arrays.asList(
                new Tuple2<String,Integer>("zhangsan",1),
                new Tuple2<String,Integer>("lisi",2),
                new Tuple2<String,Integer>("wangwu",3),
                new Tuple2<String,Integer>("niuer",4)
                );
        List<Tuple2<String, String>> list3 = Arrays.asList(
                new Tuple2<String,String>("zhangsan","100"),
                new Tuple2<String,String>("lisi","200"),
                new Tuple2<String,String>("wangwu","c"),
                new Tuple2<String,String>("niuer","400")
                );
        //返回是JavaPairRDD   不是JavaRDD
        JavaPairRDD<String, String> rdd1 = sc.parallelizePairs(list1,2);
        JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(list2,3);
        JavaPairRDD<String, String> rdd3 = sc.parallelizePairs(list3,6);
        
        //2、join 必须作用在K-V格式RDD上 按照两个RDD的key去关联  只有相同的key才会被关联出来
        //    join后的分区数与父RDD分区数多的那一个相同
        JavaPairRDD<String, Tuple2<String, Integer>> join = rdd1.join(rdd2);
        join.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Tuple2<String, Integer>> t) throws Exception {
                System.out.println(t);
            }
        });
        
        //3、leftOuterJoin 左连接 以左边的key为主 左边的key都会出现  返回的仍然是K-V RDD
        JavaPairRDD<String, Tuple2<String, Optional<Integer>>> leftOuterJoin = rdd1.leftOuterJoin(rdd2);
        leftOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<String,Optional<Integer>>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Tuple2<String, Optional<Integer>>> arg0) throws Exception {
                System.out.println(arg0);
                //Optional 两个子类:some和none
                //of有值  absent无值
                /*
                 * (zhangsan,(a,Optional.of(1)))
                    (wangwu,(c,Optional.of(3)))
                    (zhaoliu,(d,Optional.absent()))
                    (lisi,(b,Optional.of(2)))
                 */
                String key = arg0._1;
                String value1 = arg0._2._1;
                Optional<Integer> optional = arg0._2._2;
                if (optional.isPresent())//isPresent()判断是否有值 true有值  false无值
                {
                    System.out.println("key="+key+",value1="+value1+",value2="+optional.get());
                }
                else
                {
                    System.out.println("key="+key+",value1="+value1);
                }
                
            }
        });
        
        //4、rightOuterJoin右连接 以右边的key为主 右边的key都会出现  返回的仍然是K-V RDD
        JavaPairRDD<String, Tuple2<Optional<String>, Integer>> rightOuterJoin = rdd1.rightOuterJoin(rdd2);
        rightOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Tuple2<Optional<String>, Integer>> arg0) throws Exception {
                System.out.println(arg0);
                //of代表有值  absent代表无值
                /*
                 * (zhangsan,(Optional.of(a),1))   
                    (wangwu,(Optional.of(c),3))
                    (niuer,(Optional.absent(),4))
                    (lisi,(Optional.of(b),2))
                 */
                String key = arg0._1;
                Integer value2 = arg0._2._2;
                Optional<String> optional = arg0._2._1;
                if (optional.isPresent())//isPresent()判断是否有值 true有值  false无值
                {
                    System.out.println("key="+key+",value1="+optional.get()+",value2="+value2);
                }
                else
                {
                    System.out.println("key="+key+",value2="+value2);
                }
            }
        });
        
        //5、fullOuterJoin 全连接  两边的key都会取到
        JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> fullOuterJoin = rdd1.fullOuterJoin(rdd2);
        fullOuterJoin.foreach(new VoidFunction<Tuple2<String,Tuple2<Optional<String>,Optional<Integer>>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Tuple2<Optional<String>, Optional<Integer>>> arg0) throws Exception {
                System.out.println(arg0);
            }
        });
        
        //6、union 合并两个数据集。两个数据集的类型要一致。            返回新的RDD的分区数是合并RDD分区数的总和
        JavaPairRDD<String, String> union = rdd1.union(rdd3);
        System.out.println("union分区数:"+union.partitions().size());
        union.foreach(new VoidFunction<Tuple2<String,String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, String> arg0) throws Exception {
                System.out.println(arg0);
            }
        });
        
        //7、intersection     取两个数据集的交集  KV完全相同
        JavaPairRDD<String, String> intersection = rdd1.intersection(rdd3);
        intersection.foreach(new VoidFunction<Tuple2<String,String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, String> arg0) throws Exception {
                System.out.println(arg0);
            }
        });
        
        //8、subtract        取两个数据集的差集
        JavaPairRDD<String, String> subtract = rdd1.subtract(rdd3);
        subtract.foreach(new VoidFunction<Tuple2<String,String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, String> arg0) throws Exception {
                System.out.println(arg0);
            }
        });

        JavaPairRDD<String, String> rdd11 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String,String>("zhangsan","aaa"),
                new Tuple2<String,String>("zhangsan","bbb"),
                new Tuple2<String,String>("zhangsan","ccc"),
                new Tuple2<String,String>("lisi","100"),
                new Tuple2<String,String>("lisi","200"),
                new Tuple2<String,String>("lisi","300"),
                new Tuple2<String,String>("wangwu","c11"),
                new Tuple2<String,String>("wangwu","c33"),
                new Tuple2<String,String>("zhaoliu","d")
                ));
        JavaPairRDD<String, String> rdd22 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String,String>("zhangsan","1"),
                new Tuple2<String,String>("zhangsan","222"),
                new Tuple2<String,String>("zhangsan","333"),
                new Tuple2<String,String>("lisi","ddd"),
                new Tuple2<String,String>("lisi","eee"),
                new Tuple2<String,String>("lisi","ttt"),
                new Tuple2<String,String>("niuer","ffff")
                ));

        //9、cogroup当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))
        //将两个rdd中相同key对应的value各自组成一个集合
        JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<String>>> cogroup = rdd11.cogroup(rdd22);
        cogroup.foreach(new VoidFunction<Tuple2<String,Tuple2<Iterable<String>,Iterable<String>>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> arg0) throws Exception {
                System.out.println(arg0);
                /*
                 * (zhangsan,([aaa, bbb, ccc],[1, 222, 333]))
                    (wangwu,([c11, c33],[]))
                    (niuer,([],[ffff]))
                    (zhaoliu,([d],[]))
                    (lisi,([100, 200, 300],[ddd, eee, ttt]))
                 */
            }
        });
        
        //10、mapPartition 与map类似,遍历的单位是每个partition上的数据 map是一个个处理。
        //类型    foreachPartition  遍历的是每个partition上的数据 foreach则是一个个遍历
        JavaRDD<String> rdd4 = sc.parallelize(Arrays.asList("a","b","c","d","e","f","g"),3);
        rdd4.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<String> call(Iterator<String> iter) throws Exception {
                List<String> list = new ArrayList<String>();
                System.out.println("begin");
                while(iter.hasNext())
                {
                    String s = iter.next();
                    System.out.println(s);
                    list.add(s);
                }
                System.out.println("end");
                return list;
            }
        }).collect();
        
        rdd4.foreachPartition(new VoidFunction<Iterator<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(Iterator<String> iter) throws Exception {
                System.out.println("开始");
                while(iter.hasNext())
                {
                    System.out.println(iter.next());
                }
                System.out.println("结束");
            }
        });
        //11、distinct(map+reduceByKey+map)  reduceByKey有分组功能
        JavaRDD<String> rdd5 = sc.parallelize(Arrays.asList("a","a","b","b","b","c","d"),4);
        JavaRDD<String> distinct = rdd5.distinct();
        distinct.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String arg0) throws Exception {
                System.out.println(arg0);
            }
        });
        
        //12、mapPartitionWithIndex     类似于mapPartitions,
        //会将RDD中的partition索引下标带出来,index 是每个partition的索引下标
        JavaRDD<String> rdd6 = sc.parallelize(Arrays.asList(
                "love1","love2","love3","love4",
                "love5","love6","love7","love8",
                "love9","love10","love11","love12"
            ),3);
        
        //Function2  第一个参数是分区索引 第二个是这个分区值的list集合 第三个是经过call处理之后的返回值
        JavaRDD<String> mapPartitionsWithIndex = rdd6.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            //入参iter 是每个分区的集合的迭代器
            public Iterator<String> call(Integer index, Iterator<String> iter) throws Exception {
                List<String> list = new ArrayList<String>();
                
                while(iter.hasNext()) {
                    String one = iter.next();
                    list.add("rdd1 partition index = 【"+index+"】, value = 【"+one+"】");
                }
                return list.iterator();
            }
        }, true);
        mapPartitionsWithIndex.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(String v) throws Exception {
                System.out.println(v);
            }
        });;
        //13、repartition 增加或减少分区。会产生shuffle。(多个分区分到一个分区不会产生shuffle)
        //repartition 是有shuffle的算子,可以对RDD重新分区。可以增加分区,也可以减少分区。
         //repartition = coalesce(numPartitions,true)
        JavaRDD<String> repartition = mapPartitionsWithIndex.repartition(2);
        repartition.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String v) throws Exception {
                System.out.println(v);
            }
        });
        
        //14、coalesce coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
        //coalesce 与repartition一样,可以对RDD进行分区,可以增多分区,也可以减少分区。
        //    coalsece(numPartitions,shuffle [Boolean = false]) false不产生shuffle  窄依赖  true就生成宽依赖  产生shuffle
        //spark有空分区的概念  但是往多了分区不会产生空分区
        //true为产生shuffle,false不产生shuffle。默认是false。
        //如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用,如果设置成true,效果和repartition一样。即repartition(numPartitions) = coalesce(numPartitions,true)
        JavaRDD<String> coalesce = mapPartitionsWithIndex.coalesce(4, false);
        coalesce.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String v) throws Exception {
                System.out.println(v);
            }
        });
        
        //15、groupByKey 作用在K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。
        JavaPairRDD<String, Integer> rdd7 = sc.parallelizePairs(Arrays.asList(
        new Tuple2<String,Integer>("zhangsan",100),
        new Tuple2<String,Integer>("zhangsan",200),
        new Tuple2<String,Integer>("lisi",300),
        new Tuple2<String,Integer>("lisi",400),
        new Tuple2<String,Integer>("wangwu",500),
        new Tuple2<String,Integer>("wangwu",600)
        ),2);
        JavaPairRDD<String, Iterable<Integer>> groupByKey = rdd7.groupByKey();
        groupByKey.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Iterable<Integer>> v) throws Exception {
                System.out.println(v);
                /*
                 * (zhangsan,[100, 200])
                    (wangwu,[500, 600])
                    (lisi,[300, 400])
                 */
            }
        });
        //16、zip 将两个RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的每个分区元素个数必须相同。
        JavaPairRDD<String, String> rdd8 = sc.parallelizePairs(Arrays.asList(
                new Tuple2<String,String>("zhangsan","18"),
                new Tuple2<String,String>("zhangsan","18"),
                new Tuple2<String,String>("lisi","19"),
                new Tuple2<String,String>("lisi","190"),
                new Tuple2<String,String>("wangwu","100"),
                new Tuple2<String,String>("wangwu","200")
            ),2);
        JavaPairRDD<Tuple2<String, Integer>, Tuple2<String, String>> zip = rdd7.zip(rdd8);
        zip.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Tuple2<String,String>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Tuple2<String, Integer>, Tuple2<String, String>> v) throws Exception {
                System.out.println(v);
                /*
                 * ((zhangsan,100),(zhangsan,18))
                    ((zhangsan,200),(zhangsan,18))
                    ((lisi,300),(lisi,19))
                    ((lisi,400),(lisi,190))
                    ((wangwu,500),(wangwu,100))
                    ((wangwu,600),(wangwu,200))
                 */
            }
        });
        
        //17、zipWithIndex     该函数将RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
        JavaPairRDD<Tuple2<String, Integer>, Long> zipWithIndex = rdd7.zipWithIndex();
        zipWithIndex.foreach(new VoidFunction<Tuple2<Tuple2<String,Integer>,Long>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Tuple2<String, Integer>, Long> v) throws Exception {
                System.out.println(v);
                /*
                 * ((zhangsan,100),0)
                    ((zhangsan,200),1)
                    ((lisi,300),2)
                    ((lisi,400),3)
                    ((wangwu,500),4)
                    ((wangwu,600),5)
                 */
            }
        });
        
        //Action 算子
        //18、reduce  对RDD中的每个元素 使用传递的逻辑去处理
        JavaRDD<Integer> rdd9 = sc.parallelize(Arrays.asList(1,2,3,4,5));
        Integer reduce = rdd9.reduce(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                //将相邻两个元素相加返回
                return v1+v2;
            }
        });
        System.out.println(reduce);//15
        
        //19、countByKey() 对RDD中相同的key的元素计数
        Map<String, Object> countByKey = rdd8.countByKey();
        Set<Entry<String, Object>> entrySet = countByKey.entrySet();
        for(Entry<String, Object> entry :entrySet) {
            String key = entry.getKey();
            Object value = entry.getValue();
            System.out.println("key = "+key+",value ="+value);
        }
        
        //20、countByValue() 对RDD中相同的元素计数
        //根据数据集每个元素相同的内容来计数,不是针对K-V里的V,而是整个K-V。返回相同内容的元素对应的条数
        Map<Tuple2<String, String>, Long> countByValue = rdd8.countByValue();
        Set<Entry<Tuple2<String, String>, Long>> entrySet1 = countByValue.entrySet();
        for(Entry<Tuple2<String, String>, Long> entry :entrySet1) {
            Tuple2<String, String> key = entry.getKey();
            Long value = entry.getValue();
            System.out.println("key = "+key+",value ="+value);
            /*
             * key = (lisi,19),value =1
                key = (wangwu,100),value =1
                key = (zhangsan,18),value =2
                key = (lisi,190),value =1
                key = (wangwu,200),value =1
             */
        }
        
        //close()方法里调用了stop
        sc.close();
        //sc.stop();
    }
}

Scala示例:

package com.wjy

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ListBuffer

object TransFormation {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("TransFormationTest");
    val sc = new SparkContext(conf);
    //1、创建RDD 查看默认分区数,第二个参数指定分区数  未指定 默认1 分区数决定task数
    val rdd = sc.parallelize(Array("a","a","b","b","b","c","d"), 3);
    //length 或size
    println("分区数:"+rdd.partitions.size);
    //创建RDD方式2
    val rdd2 = sc.makeRDD(Array("a","b","b","b","c","d"), 2);
    //创建K-V格式RDD
    val rdd3 = sc.makeRDD(Array(("张三","v1"),("王五","v2"),("李四","v3"),("牛二","v4")));
    val rdd4 = sc.makeRDD(List(("张三","111"),("赵六","4444"),("李四","v3"),("王二","677")));
   //2、join
    rdd3.join(rdd4).foreach(println);
    /*
     * (张三,(v1,111))
                (李四,(v3,4534))
     */
    
    //3、leftOuterJoin
   rdd3.leftOuterJoin(rdd4).foreach(println);
   /*
    * (张三,(v1,Some(111)))
            (李四,(v3,Some(4534)))
            (王五,(v2,None))
            (牛二,(v4,None))
    */
   
    //4、rightOuterJoin
   rdd3.rightOuterJoin(rdd4).foreach(println);
   /*
    * (张三,(Some(v1),111))
            (王二,(None,677))
            (李四,(Some(v3),4534))
            (赵六,(None,4444))
    */
   
   //5、fullOuterJoin
   rdd3.fullOuterJoin(rdd4).foreach(println);
   /*
    * (张三,(Some(v1),Some(111)))
            (王二,(None,Some(677)))
            (李四,(Some(v3),Some(4534)))
            (赵六,(None,Some(4444)))
            (王五,(Some(v2),None))
            (牛二,(Some(v4),None))
    */
   
   //6、union
   rdd3.union(rdd4).foreach(println);
    /*
     * (张三,v1)
(王五,v2)
(李四,v3)
(牛二,v4)
(张三,111)
(赵六,4444)
(李四,4534)
(王二,677)
     */
   //7、交集intersection
   rdd3.intersection(rdd4).foreach(println);
   //8、差集subtract
   rdd3.subtract(rdd4).foreach(println);
   //9、cogroup
   rdd3.cogroup(rdd4).foreach(println);
   //10、mapPartition true表示与之前的分区器相同
   rdd3.mapPartitions(iter=>{iter}, true);  
   //11、distinct
   rdd3.distinct().foreach(println);
   //12、mapPartitionsWithIndex
   val rdd5 = sc.parallelize(Array(
            "love1","love2","love3","love4",
            "love5","love6","love7","love8",
            "love9","love10","love11","love12"
            ),3);
   rdd5.mapPartitionsWithIndex((index,iter)=>{
     val list=new ListBuffer[String]();
     while(iter.hasNext){
       list.+=("rdd1 partition index = "+index+",value = "+iter.next());
     }
     list.iterator;
     }, true);
   //13、repartition
   rdd5.repartition(4).foreach(println);
   //14、coalesce
   rdd5.coalesce(4, false).foreach(println);
   //15、groupByKey
   val rdd6 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"),(3,"d")),3);
   rdd6.groupByKey().foreach(println);
   //16、zipWithIndex
   rdd6.zipWithIndex().foreach(println);
   //17、zip
   val rdd8= sc.parallelize(Array((11,"aa"),(22,"bb"),(33,"cc"),(33,"dd")),3);
   rdd6.zip(rdd8).foreach(println);
   /*
    * ((1,a),(11,aa))
            ((2,b),(22,bb))
            ((3,c),(33,cc))
            ((3,d),(33,dd))
    */
   
   //Action 算子
   //18、countByKey
   rdd6.countByKey().foreach(println);
   //19、countByValue
   rdd6.countByValue().foreach(println);
   //20、reduce
   val rdd7=sc.parallelize(Array(1,2,3,4,5));
   val rev = rdd7.reduce((v1,v2)=>{
     v1+v2;
   });
   println(rev);//15
   
   sc.stop();
  }

}

五、Action行动算子
Action类算子叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。
take(n):返回一个包含数据集前n个元素的集合。
first:first=take(1),返回数据集中的第一个元素。
foreach:循环遍历数据集中的每个元素,运行相应的逻辑。
collect:将计算结果回收到Driver端。

 

六、控制算子
控制算子有三种:cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。
cache和persist都是懒执行的。必须有一个action类算子触发执行。
checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

1、cache:默认将RDD的数据持久化到内存中,cache是懒执行。

 SparkConf conf = new SparkConf();
 conf.setMaster("local").setAppName("CacheTest");
 JavaSparkContext jsc = new JavaSparkContext(conf);
 JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");

 lines = lines.cache();
 long startTime = System.currentTimeMillis();
 long count = lines.count();
 long endTime = System.currentTimeMillis();
 System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime));
        
 long countStartTime = System.currentTimeMillis();
 long countrResult = lines.count();
 long countEndTime = System.currentTimeMillis();
 System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-countStartTime));
        
 jsc.stop();

2、persist:可以指定持久化的级别。最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。
持久化级别如下:


cache和persist的注意事项:
(1)cache和persist都是懒执行,必须有一个action类算子触发执行。
(2)cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
(3)cache和persist算子后不能立即紧跟action算子。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
checkpoint 的执行原理:
(1)当RDD的job执行完毕后,会从finalRDD从后往前回溯。
(2)当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
(3)Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
使用:

SparkConf conf = new SparkConf();
 conf.setMaster("local").setAppName("checkpoint");
 JavaSparkContext sc = new JavaSparkContext(conf);
 sc.setCheckpointDir("./checkpoint");
 JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
 parallelize.checkpoint();
 parallelize.count();
 sc.stop();


参考:
Spark:https://www.cnblogs.com/qingyunzong/category/1202252.html

Spark之RDD

原文地址:https://www.cnblogs.com/cac2020/p/10637310.html