spark记录(4)spark算子之Action

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

(1)reduce

reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。

代码

    public static void reduce() {
        JavaRDD<String> rdd = jsc.textFile("words");
        String reduce = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).reduce(new Function2<String, String, String>() {
            
            private static final long serialVersionUID = 1L;

            @Override
            public String call(String a, String b) throws Exception {
                return a+"-->"+b;
            }
        });
        System.out.println(reduce);
        
    }

结果:

(2)collect()

 将计算的结果作为集合拉回到driver端,一般在使用过滤算子或者一些能返回少量数据集的算子后,将结果回收到Driver端打印显示。

分布式环境下尽量规避,如有其他需要,手动编写代码实现相应功能就好。

详情请参考:https://blog.csdn.net/Fortuna_i/article/details/80851775

代码:

    public static void collect() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> collect = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).collect();
        for (String string : collect) {
            System.out.println(string);
        }
    }

结果:

(3)take

返回一个包含数据集前n个元素的数组(从0下标到n-1下标的元素),不排序。

代码:

    public static void take() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> take = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).take(5);
        for (String string : take) {
            System.out.println(string);
        }
    }

结果:

(4)first

返回数据集的第一个元素(底层即是take(1))

代码:

    public static void first() {
        JavaRDD<String> rdd = jsc.textFile("words");
        String first = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).first();
        System.out.println(first);

    }

结果:

(5)takeSample(withReplacement, num, [seed])

对于一个数据集进行随机抽样,返回一个包含num个随机抽样元素的数组,withReplacement表示是否有放回抽样,参数seed指定生成随机数的种子。

该方法仅在预期结果数组很小的情况下使用,因为所有数据都被加载到driver端的内存中。

代码:

    public static void takeSample() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> takeSample = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).takeSample(false, 5, 1);
        for (String string : takeSample) {
            System.out.println(string);
        }
    }

结果:

(6)count

返回数据集中元素个数,默认Long类型。

代码:

    public static void count() {
        JavaRDD<String> rdd = jsc.textFile("words");
        long count = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).count();
        System.out.println(count);
    }

结果:

 (7)takeOrdered(n,[ordering])

返回RDD中前n个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。

代码:

    public static void takeOrdered() {
        JavaRDD<String> rdd = jsc.textFile("words");
        List<String> takeSample = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).takeOrdered(4);
        for (String string : takeSample) {
            System.out.println(string);
        }
    }

结果:

(8)saveAsTextFile(path)

将rdd中元素以文本文件的形式写入本地文件系统或者HDFS等。Spark将对每个元素调用toString方法,将数据元素转换为文本文件中的一行记录。

若将文件保存到本地文件系统,那么只会保存在executor所在机器的本地目录。

代码:

    public static void saveAsTextFile() {
        JavaRDD<String> rdd = jsc.textFile("words");
        rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).saveAsTextFile("C:/Users/Administrator/Documents/result/data.txt");
    }

结果:

(9) saveAsSequenceFile(path)

将rdd中元素以Hadoop SequenceFile的形式写入本地文件系统或者HDFS等。(对pairRDD操作)

java api中无该方法

代码:

  def saveAsSequenceFile: Unit = {
    val rdd1 = sc.textFile("words", 2)
    rdd1.flatMap(_.split(" ")).map(new Tuple2(_,1))
      .saveAsSequenceFile("C:/Users/Administrator/Documents/result/data2")
  }

结果:

(10)saveAsObjectFile(path)(Java and Scala)

将数据集中元素以ObjectFile形式写入本地文件系统或者HDFS等。

代码:

    public static void saveAsObjectFile() {
        JavaRDD<String> rdd = jsc.textFile("words");
        rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).saveAsObjectFile("C:/Users/Administrator/Documents/result/data3");
    }

结果:

(11)countByKey()

作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。返回一个Map<K,Object>

代码:

    public static void countByKey() {
        JavaRDD<String> rdd = jsc.textFile("words");
        JavaPairRDD<String, Integer> mapToPair = rdd.flatMap(new FlatMapFunction<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String a) throws Exception {
                return Arrays.asList(a.split(" "));
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String a) throws Exception {
                return new Tuple2<String, Integer>(a, 1);
            }
        });
        Map<String, Object> key = mapToPair.countByKey();
        for (Entry<String, Object> entry : key.entrySet()) {
            System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
        }
    }

结果:

(12)countByValue

countByValue()函数与tuple元组中的(k,v)中的v 没有关系,这点要搞清楚,countByValue是针对Rdd中的每一个元素对象,

而 countByKey 主要针对的事tuple(k,v)对象,并且与k 是有关系的,countByKey根据tuple(k,v)中的 k 进行统计的。使用的时候要区分。

代码:

public class Operator_countByValue {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(2,"c"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"d")
        ));
        
        Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
        
        for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
            System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
        }
    }
}

结果:

(13) foreach 、foreachPartition

foreach不多说

foreachPartition因为没有返回值并且是action操作,一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,或者hbase中,可以用它。

public class Operator_foreachPartition {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("foreachPartition");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("./words.txt",3);
        lines.foreachPartition(new VoidFunction<Iterator<String>>() {

            private static final long serialVersionUID = -2302401945670821407L;

            @Override
            public void call(Iterator<String> t) throws Exception {
                System.out.println("创建数据库连接。。。");
                while(t.hasNext()){
                    System.out.println(t.next());
                }
                
            }
        });
        
        sc.stop();
    }
}
原文地址:https://www.cnblogs.com/kpsmile/p/10440486.html