Spark练习之Transformation操作开发

一、map:将集合中的每个元素乘以2

1.1 Java

/**
     * map算子:将集合中的每一个元素都乘以2
     */
    private static void map() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("map")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        //并行化集合,创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //使用map算子,将集合中的每个元素都乘以2
        //map算子,是对任何类型的RDD,都可以调用的
        //在Java中,map算子接收的参数时Function对象
        //创建的Function对象,一定会让你设置第二个泛型参数,这个泛型参数,就是返回的新元素的类型
        //同时call()方法的返回类型,也必须与第二个泛型类型同步
        //在call()方法内部,就可以对原始RDD中的每一个元素进行各种处理和计算,并返回一个新的元素
        //所有新的元素就会组成一个新的RDD
        JavaRDD<Integer> multipleNUmberRDD = numberRDD.map(new Function<Integer, Integer>() {
            private static final long serivalVersionUID = 1L;

            //传入call方法的,是1,2,3,4,5
            //返回的就是2,4,6,8,10
            @Override
            public Integer call(Integer integer) throws Exception {
                return integer * 2;
            }
        });
        //打印新的RDD
        multipleNUmberRDD.foreach(new VoidFunction<Integer>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        //关闭JavaSparkContext
        sc.close();
    }

1.2 Scala

def map(): Unit = {
    val conf = new SparkConf().setAppName("map").setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(1, 2, 3, 4, 5)
    val numberRDD = sc.parallelize(numbers, 1)
    val multipleNumberRDD = numberRDD.map(num => num * 2)
    multipleNumberRDD.foreach(num => println(num))

  }

二、filter:过滤出集合中的偶数

2.1 Java

/**
     * filter算子:过滤集合中的偶数
     */
    private static void filter() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("map")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        //并行化集合,创建初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        //对初始化RDD执行filter算子,过滤出其中的偶数
        //filter算子操作,传入的也是Function,其他的使用注意点,和map是一样的
        //但是,唯一的不同,就是call()方法的返回类型是Boolean
        //每一个初始RDD中的元素,都会传入call()方法,此时你可以执行各种自定义的计算逻辑
        //来判断这个元素是否是你想要的
        //如果想在新的RDD中保留这个元素,那么就返回true,否则,返回false
        JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
            private static final long serivalVersionUID = 1L;

            //传入call方法的,是1,2,3,4,5
            //返回的就是2,4,6,8,10
            @Override
            public Boolean call(Integer integer) throws Exception {
                return integer % 2 == 0;
            }
        });
        //打印新的RDD
        evenNumberRDD.foreach(new VoidFunction<Integer>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public void call(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        //关闭JavaSparkContext
        sc.close();
    }

2.2 Scala

def filter(): Unit = {
    val conf = new SparkConf().setAppName("filter").setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val numberRDD = sc.parallelize(numbers, 1)
    val multipleNumberRDD = numberRDD.filter(num => num % 2 == 0)
    multipleNumberRDD.foreach(num => println(num))
  }

三、flatMap:将行拆分为单词

3.1 Java

/**
     * flatMap算子:过滤集合中的偶数
     */
    private static void flatMap() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("flatMap")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        //构造集合
        List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
        //并行化集合,创建初始RDD
        JavaRDD<String> lines = sc.parallelize(lineList);
        //对初始化RDD执行flatMap算子,将每一行文本,拆分为多个单词
        //flatMap算子,在Java中,接收的参数的FlagMapFunction
        //需要自定义FlatMapFunction的第二个泛型类型,即代表了返回的新元素的类型
        //call()方法,返回的类型,不是U,而是Iterable<U>,这里的U也与第二个泛型类型相同
        //flatMap其实就是,接收原始RDD中的每个元素,并进行各种逻辑的计算和处理,返回可以返回多个元素
        //多个元素,即封装在Iterator集合中,可以使用ArrayList等集合
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public Iterator<String> call(String s) throws Exception {
                return (Iterator<String>) Arrays.asList(s.split(" "));
            }
        });
        //打印新的RDD
        words.foreach(new VoidFunction<String>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public void call(String t) throws Exception {
                System.out.println(t);
            }
        });

        //关闭JavaSparkContext
        sc.close();
    }

3.2 Scala

 def flatMap(): Unit = {
    val conf = new SparkConf().setAppName("flatMap").setMaster("local")
    val sc = new SparkContext(conf)
    val lineArray = Array("hello you", "hello me", "hello world")
    val lines = sc.parallelize(lineArray, 1)
    val words = lines.flatMap(line => line.split(" "))
    words.foreach(word => println(word))
  }

四、groupByKey:将每个班级的成绩进行分组

4.1 Java

/**
     * groupNyKey算子:按照班级对成绩进行分组
     */
    private static void groupByKey() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("groupByKey")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //构造集合
        List<Tuple2<String, Integer>> scoresList = Arrays.asList(
                new Tuple2<>("class1", 80),
                new Tuple2<>("class2", 88),
                new Tuple2<>("class1", 80),
                new Tuple2<>("class2", 90));
        //并行化集合,创建JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.<String, Integer>parallelizePairs(scoresList);
        //针对Scores RDD,执行groupByKey算子,对每个班级的成绩进行分组
        //groupByKey算子,返回的还是JavaPairRDD
        //但是,JavaPairRDD的第一个泛型类型不变,第二个泛型类型会变成Iterable这种集合类型
        //也就是说,按照了Key进行分组,每个key可能都会有多个value,此时多个value聚合成了Iterable
        //那么接下来,就可以通过groupedScores这种JavaPairRDD处理某个分组内的数据
        JavaPairRDD<String, Iterable<Integer>> groupEdScores = scores.groupByKey();
        //打印groupedScores RDD
        groupEdScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            private static final long serivalVersionUID = 1L;

            //对每个key,都会将其value,依次传入call方法
            //从而聚合出每个key对应的一个value
            //然后,将每个key对应的一个value,组合成一个Tuple2,作为新RDD的元素
            @Override
            public void call(Tuple2<String, Iterable<Integer>> stringIterableTuple2) throws Exception {
                System.out.println("class:" + stringIterableTuple2._1);
                Iterator<Integer> ite = stringIterableTuple2._2.iterator();
                while (ite.hasNext()) {
                    System.out.println(ite.next());
                }
                System.out.println("====================================");
            }
        });
        //关闭JavaSparkContext
        sc.close();
    }

2.2 Scala

def groupByKey(): Unit = {
    val conf = new SparkConf().setAppName("groupByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
      new Tuple2[String, Integer]("class2", 88),
      new Tuple2[String, Integer]("class1", 80),
      new Tuple2[String, Integer]("class2", 90))
    val scores = sc.parallelize(scoreList, 1)
    val groupedScores = scores.groupByKey()
    groupedScores.foreach(
      score => {
        println(score._1)
        score._2.foreach(singleScore => println(singleScore))
        println("===========")
      }
    )
  }

五、reduceByKey:统计每个班级的总分

5.1 Java

/**
     * reduceNyKey算子:统计每个班级的总分
     */
    private static void reduceNyKey() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("reduceNyKey")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //构造集合
        List<Tuple2<String, Integer>> scoresList = Arrays.asList(
                new Tuple2<>("class1", 80),
                new Tuple2<>("class2", 88),
                new Tuple2<>("class1", 80),
                new Tuple2<>("class2", 90));
        //并行化集合,创建JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.<String, Integer>parallelizePairs(scoresList);
        //针对Scores RDD,执行reduceByKey算子
        //reduceByKey,接收的参数时Function2类型,它有三个泛型参数,实际上代表了3个值
        //第一个泛型类型和第二个泛型类型,代表了原始RDD中的元素的value的类型
        //因此对每个key进行reduce,都会依次将第一个、第二个value传入,将值再与第三个value传入
        //因此此处,会自动定义两个泛型类型,代表call()方法的两个传入参数的类型
        //第三个泛型类型,代表了每次reduce操作返回的值的类型,默认也是与原始RDD的value类型相同的
        //reduceByKey算子返回的RDD,还是JavaPairRDD<key,value>
        JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //打印totalScore RDD
        //打印groupedScores RDD
        totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Integer> t) throws Exception {
                System.out.println(t._1 + ":" + t._2);

            }
        });
        //关闭JavaSparkContext
        sc.close();
    }

5.2 Scala

def reduceByKey(): Unit = {
    val conf = new SparkConf().setAppName("reduceByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(new Tuple2[String, Integer]("class1", 80),
      new Tuple2[String, Integer]("class2", 88),
      new Tuple2[String, Integer]("class1", 80),
      new Tuple2[String, Integer]("class2", 90))
    val scores = sc.parallelize(scoreList, 1)
    val totalScores = scores.reduceByKey(_ + _)
    totalScores.foreach(classScore => println(classScore._1 + ":" + classScore._2))
  }

六、sortByKey:将学生分数进行排序

6.1 Java

/**
     * sortByKey算子:按照学生分数进行排序
     */
    private static void sortByKey() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("sortByKey")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //构造集合
        List<Tuple2<Integer, String>> scoresList = Arrays.asList(
                new Tuple2<>(65, "leo"),
                new Tuple2<>(60, "tom"),
                new Tuple2<>(90, "marry"),
                new Tuple2<>(88, "jack"));
        //并行化集合,创建JavaPairRDD
        JavaPairRDD<Integer, String> scores = sc.<Integer, String>parallelizePairs(scoresList);

        //对scoreRDD执行sortByKey算子
        //sortByKey其实就是根据key进行排序,可以手动指定升序,或者降序
        //返回的,还是JavaPairRDD,其中的元素内容和原始的RDD一样
        //只是RDD中的元素顺序不同了
        JavaPairRDD<Integer, String> sortedScored = scores.sortByKey();

        //打印sortedScored RDD
        sortedScored.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            private static final long serivalVersionUID = 1L;

            @Override
            public void call(Tuple2<Integer, String> t) throws Exception {
                System.out.println(t._1 + ":" + t._2);
            }
        });
        //关闭JavaSparkContext
        sc.close();
    }

6.2 Scala

def sortByKey(): Unit = {
    val conf = new SparkConf().setAppName("sortByKey").setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(new Tuple2[Integer, String](90, "cat"),
      new Tuple2[Integer, String](80, "leo"),
      new Tuple2[Integer, String](80, "opp"),
      new Tuple2[Integer, String](55, "lll"))
    val scores = sc.parallelize(scoreList, 1)
    val totalScores = scores.sortByKey()
    totalScores.foreach(studentScore => println(studentScore._1 + ":" + studentScore._2))
  }

七、join:打印每个学生的成绩

7.1 Java

/**
     * join算子:打印学生成绩
     */
    private static void join() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("join")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //学生集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<>(1, "leo"),
                new Tuple2<>(2, "tom"),
                new Tuple2<>(3, "marry"),
                new Tuple2<>(4, "jack"));
        //分数集合
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<>(1, 100),
                new Tuple2<>(2, 80),
                new Tuple2<>(3, 50),
                new Tuple2<>(4, 20));
        //并行化两个RDD
        JavaPairRDD<Integer, String> student = sc.<Integer, String>parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.<Integer, Integer>parallelizePairs(scoreList);

        //使用join算子关联两个RDD
        //join以后,还是会根据key进行join,并返回JavaPairRDD
        //但是JavaPairRDD的第一个泛型类型,之前两个JavaPairRDD的key类型,因为是通过key进行join的
        //第二个泛型类型,是Tuple2<v1,v2>的类型,Tuple2的两个反省分别为原始RDD的value的类型
        //join,就返回的RDD的每一个元素,就是通过Key join上的一个pair
        //比如有(1,1)(1,2)(1,3)的一个RDD和(1,4)(2,1)(2.2)的一个RDD
        //join以后,实际上会得到(1,(1,4))(1,(2,4))(1,(3,4))
        JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = student.<Integer>join(scores);

        //打印sortedScored RDD
        studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
                System.out.println("student id:" + t._1);
                System.out.println("student name:" + t._2._1);
                System.out.println("student score:" + t._2._2);
                System.out.println("==============");
            }
        });


        //关闭JavaSparkContext
        sc.close();
    }

7.2 Scala

def join(): Unit = {
    val conf = new SparkConf().setAppName("join").setMaster("local")
    val sc = new SparkContext(conf)

    //学生集合
    val studentList = Array(
      new Tuple2[Integer, String](1, "leo"),
      new Tuple2[Integer, String](2, "tom"),
      new Tuple2[Integer, String](3, "marry"))
    //分数集合
    val scoreList = Array(
      new Tuple2[Integer, Integer](1, 100), new Tuple2[Integer, Integer](2, 80),
      new Tuple2[Integer, Integer](3, 50), new Tuple2[Integer, Integer](1, 70),
      new Tuple2[Integer, Integer](2, 10), new Tuple2[Integer, Integer](3, 40))

    val student = sc.parallelize(studentList)
    val scores = sc.parallelize(scoreList)

    val studentScores = student.join(scores)
    studentScores.foreach(studentScore => println({
      System.out.println("student id:" + studentScore._1)
      System.out.println("student name:" + studentScore._2._1)
      System.out.println("student score:" + studentScore._2._2)
      System.out.println("==============")
    }))
  }

八、cogroup:打印每个学生的成绩

8.1 Java

/**
     * cogroup算子:打印学生成绩
     */
    private static void cogroup() {
        //创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("cogroup")
                .setMaster("local");
        //创建JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        //学生集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(
                new Tuple2<>(1, "leo"),
                new Tuple2<>(2, "tom"),
                new Tuple2<>(3, "marry"));
        //分数集合
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(
                new Tuple2<>(1, 100),
                new Tuple2<>(2, 80),
                new Tuple2<>(3, 50),
                new Tuple2<>(1, 70),
                new Tuple2<>(2, 10),
                new Tuple2<>(3, 40));
        //并行化两个RDD
        JavaPairRDD<Integer, String> student = sc.<Integer, String>parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.<Integer, Integer>parallelizePairs(scoreList);

        //cogroup与join不同
        //相当于是,一个key join上的所有value,都给放到一个Iterable里面去了
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = student.<Integer>cogroup(scores);

        //打印sortedScored RDD
        studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
                System.out.println("student id:" + t._1);
                System.out.println("student name:" + t._2._1);
                System.out.println("student score:" + t._2._2);
                System.out.println("==============");
            }
        });
        //关闭JavaSparkContext
        sc.close();
    }

九、main函数

9.1 Java

public static void main(String[] args) {
        //map();
        //filter();
        //flatMap();
        //groupByKey();
        //reduceNyKey();
        //sortByKey();
        //join();
        cogroup();
    }

9.2 Scala

def main(args: Array[String]) {
    //map()
    //filter()
    //flatMap()
    //groupByKey()
    //reduceByKey()
    //sortByKey()
    join()
  }
原文地址:https://www.cnblogs.com/aixing/p/13327440.html