spark

1、在yarn模式下运行spark作业

(1)如果想让spark作业可以运行在yarn上,那么首先必须在配置文件spark-env.sh文件中,配置HADOOP_CONF_DIR或者YARN_CONF_DIR属性,因为再使用spark-submit提交spark作业时,要先给yarn的resourcemanager发送请求,所以spark需要读spark-env.sh配置文件获取。

(2)使用yarn-cluster模式提交时,使用以下语法即可: 

./bin/spark-submit
--class path.to.your.Class
--master yarn-cluster
[options]
<app jar>
[app options]

比如如下脚本示例:

$ ./bin/spark-submit --class org.leo.spark.study.WordCount
--master yarn-cluster
--num-executors 1
--driver-memory 100m
--executor-memory 100m
--executor-cores 1
--queue hadoop队列
/usr/local/spark-study/spark-study.jar

  • partitionBy算子

 1 package mapPartitions.xls;
 2 
 3 import java.util.ArrayList;
 4 import java.util.Arrays;
 5 import java.util.HashMap;
 6 import java.util.Iterator;
 7 import java.util.List;
 8 import java.util.Map;
 9 
10 import org.apache.spark.SparkConf;
11 import org.apache.spark.api.java.JavaRDD;
12 import org.apache.spark.api.java.JavaSparkContext;
13 import org.apache.spark.api.java.function.FlatMapFunction;
14 
15 public class TransFormation09_partitionBy {
16 
17     public static void main(String[] args) {
18         // 
19         partitionBy01();
20     }
21 
22     public static void  partitionBy01(){
23         SparkConf conf = new SparkConf().setAppName("MapPartitions").setMaster("local");
24         JavaSparkContext sc = new JavaSparkContext(conf);
25 
26         List<String> studentNames = Arrays.asList("durant", "westbrook", "george", "wade");
27         JavaRDD<String> studentNamesRDD = sc.parallelize(studentNames, 2);
28 
29         final Map<String, Double> studentScoreMap = new HashMap<String, Double>();
30         studentScoreMap.put("durant", 278.5);
31         studentScoreMap.put("westbrook", 290.0);
32         studentScoreMap.put("george", 301.0);
33         studentScoreMap.put("wade", 205.0);
34 
35         JavaRDD<Double> studentScoresRDD = studentNamesRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Double>() {
36 
37                     @Override
38                     public java.util.Iterator<Double> call(Iterator<String> iterator) throws Exception {
39 
40                         List<Double> studentScoreList = new ArrayList<Double>();
41 
42                         while(iterator.hasNext()) {
43                             String studentName = iterator.next();
44                             Double studentScore = studentScoreMap.get(studentName);
45                             studentScoreList.add(studentScore);
46                         }
47                         return studentScoreList.iterator();
48                     }
49 
50                 });
51 
52         for(Double studentScore: studentScoresRDD.collect()) {
53             System.out.println(studentScore);
54         }
55 
56         sc.close();
57     }
58 }

输出结果:

278.5
290.0
301.0
205.0

  • mapPartitionsWithIndex算子

每次在进行分区数据传入时,顺便将分区号传入,(分区数量由函数parallelize指定)所以根据这个算子我们可以得知每个数据的分区编号。

 1 package mapPartitions.xls;
 2 
 3 import java.util.ArrayList;
 4 import java.util.Arrays;
 5 import java.util.Iterator;
 6 import java.util.List;
 7 
 8 import org.apache.spark.SparkConf;
 9 import org.apache.spark.api.java.JavaRDD;
10 import org.apache.spark.api.java.JavaSparkContext;
11 import org.apache.spark.api.java.function.Function2;
12 
13 public class mapPartitionsWithIndex {
14 
15     public static void main(String[] args) {
16         SparkConf conf = new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local");
17         JavaSparkContext sc = new JavaSparkContext(conf);
18 
19         List<String> studentNames = Arrays.asList("durant", "westbrook", "george", "wade", "kobe");
20 
21         JavaRDD<String> studentNamesRDD = sc.parallelize(studentNames, 9);
22 
23         JavaRDD<String> studentWithClassRDD = studentNamesRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
24                     @Override
25                     public Iterator<String> call(Integer index, Iterator<String> iterator) throws Exception {
26 
27                         List<String> studentWithClassList = new ArrayList<String>();
28 
29                         while(iterator.hasNext()) {
30                             String studentName = iterator.next();
31                             String studentWithClass = studentName + "_" + (index + 1);
32                             studentWithClassList.add(studentWithClass);
33                         }
34 
35                         return studentWithClassList.iterator();
36                     }
37 
38                 }, true);
39 
40         for(String studentWithClass : studentWithClassRDD.collect()) {
41             System.out.println(studentWithClass);
42         }
43 
44         sc.close();
45     }
46 
47 }

输出结果:

durant_2
westbrook_4
george_6
wade_8
kobe_9

  • sample算子

 1 package mapPartitions.xls;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 
10 public class Sample {
11 
12     public static void main(String[] args) {
13         //
14         sample01();
15     }
16 
17     public static void sample01(){
18 
19         SparkConf conf = new SparkConf().setAppName("Sample").setMaster("local");
20         JavaSparkContext sc = new JavaSparkContext(conf);
21 
22         List<String> staffList = Arrays.asList("name01", "name02", "name03", "name04", "name05", "name06", "name07", "name08", "name09", "name010");
23         JavaRDD<String> staffRDD = sc.parallelize(staffList);
24 
25         JavaRDD<String> luckyStaffRDD = staffRDD.sample(false, 0.5, System.currentTimeMillis());
26 
27         for(String staff : luckyStaffRDD.collect()) {
28             System.out.println(staff);
29         }
30 
31         sc.close();
32     }
33 
34 
35 }

输出结果:

name01
name02
name03
name04
name08
name09

  • union算子

将两个RDD合并到一起。

 1 package mapPartitions.xls;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 
10 public class union {
11 
12     public static void main(String[] args) {
13         SparkConf conf = new SparkConf().setAppName("union").setMaster("local");
14         JavaSparkContext sc = new JavaSparkContext(conf);
15 
16         List<String> department1StaffList = Arrays.asList("name01", "name02", "name03", "name04");
17         JavaRDD<String> department1StaffRDD = sc.parallelize(department1StaffList);
18 
19         List<String> department2StaffList = Arrays.asList("name05", "name06", "name07", "name08");
20         JavaRDD<String> department2StaffRDD = sc.parallelize(department2StaffList);
21 
22         JavaRDD<String> departmentStaffRDD = department1StaffRDD.union(department2StaffRDD);
23 
24         for(String staff : departmentStaffRDD.collect()) {
25             System.out.println(staff);
26         }
27 
28         sc.close();
29     }
30 
31 }

运行结果:

name01
name02
name03
name04
name05
name06
name07
name08

  • intersection算子

两个RDD求交集。

 1 package spark.rdd.xls;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 
10 public class Intersection {
11 
12     public static void main(String[] args) {
13         SparkConf conf = new SparkConf().setAppName("Intersection").setMaster("local");
14         JavaSparkContext sc = new JavaSparkContext(conf);
15 
16         List<String> project1MemberList = Arrays.asList("name01", "name02", "name03", "name04");
17         JavaRDD<String> project1MemberRDD = sc.parallelize(project1MemberList);
18 
19         List<String> project2MemberList = Arrays.asList("name01", "name06", "name02", "name07");
20         JavaRDD<String> project2MemberRDD = sc.parallelize(project2MemberList);
21 
22         JavaRDD<String> projectIntersectionRDD = project1MemberRDD.intersection(project2MemberRDD);
23 
24         for(String member : projectIntersectionRDD.collect()) {
25             System.out.println(member);
26         }
27 
28         sc.close();
29     }
30 
31 }

运行结果:

name01
name02

  • distinct算子

对rdd中的数据进行去重。
 1 package spark.rdd.xls;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 import org.apache.spark.api.java.function.Function;
10 
11 public class distinct {
12 
13     public static void main(String[] args) {
14         //
15         distinct01();
16     }
17 
18     public static void distinct01(){
19         SparkConf conf = new SparkConf().setAppName("Distinct").setMaster("local");
20         JavaSparkContext sc = new JavaSparkContext(conf);
21 
22         // distinct算子
23         // 对rdd中的数据进行去重
24 
25         // uv统计案例
26         // uv:user view,每天每个用户可能对网站会点击多次
27         // 此时,需要对用户进行去重,然后统计出每天有多少个用户访问了网站
28         // 而不是所有用户访问了网站多少次(pv)
29 
30         List<String> accessLogs = Arrays.asList(
31                 "user1 2016-01-01 23:58:42",
32                 "user1 2016-01-01 23:58:43",
33                 "user1 2016-01-01 23:58:44",
34                 "user2 2016-01-01 12:58:42",
35                 "user2 2016-01-01 12:58:46",
36                 "user3 2016-01-01 12:58:42",
37                 "user4 2016-01-01 12:58:42",
38                 "user5 2016-01-01 12:58:42",
39                 "user6 2016-01-01 12:58:42",
40                 "user6 2016-01-01 12:58:45");
41         JavaRDD<String> accessLogsRDD = sc.parallelize(accessLogs);
42 
43         JavaRDD<String> useridsRDD = accessLogsRDD.map(new Function<String, String>() {
44 
45             @Override
46             public String call(String accessLog) throws Exception {
47                 String userid = accessLog.split(" ")[0];
48                 return userid;
49             }
50 
51         });
52 
53         JavaRDD<String> distinctUseridsRDD = useridsRDD.distinct();
54         int uv = distinctUseridsRDD.collect().size();
55         System.out.println("uv: " + uv);
56 
57         sc.close();
58     }
59 
60 
61 }

输出结果:

uv: 6

  • aggregateByKey算子

 1 package spark.rdd.xls;
 2 
 3 import java.util.Arrays;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaPairRDD;
 8 import org.apache.spark.api.java.JavaRDD;
 9 import org.apache.spark.api.java.JavaSparkContext;
10 import org.apache.spark.api.java.function.FlatMapFunction;
11 import org.apache.spark.api.java.function.Function2;
12 import org.apache.spark.api.java.function.PairFunction;
13 import scala.Tuple2;
14 
15 public class aggregateByKey {
16 
17     public static void main(String[] args) {
18         SparkConf conf = new SparkConf().setAppName("AggregateByKey").setMaster("local");
19         JavaSparkContext sc = new JavaSparkContext(conf);
20 
21         JavaRDD<String> lines = sc.textFile("/Users/xls/Desktop/code/bigdata/data/test", 3);
22 
23         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
24 
25             @Override
26             public java.util.Iterator<String> call(String line) throws Exception {
27                 return Arrays.asList(line.split(" ")).iterator();
28             }
29 
30         });
31 
32         JavaPairRDD<String, Integer> pairs = words.mapToPair(
33 
34                 new PairFunction<String, String, Integer>() {
35 
36                     private static final long serialVersionUID = 1L;
37 
38                     @Override
39                     public Tuple2<String, Integer> call(String word) throws Exception {
40                         return new Tuple2<String, Integer>(word, 1);
41                     }
42 
43                 });
44 
45         JavaPairRDD<String, Integer> wordCounts = pairs.aggregateByKey(
46                 0, new Function2<Integer, Integer, Integer>() {
47                     
48                     @Override
49                     public Integer call(Integer v1, Integer v2) throws Exception {
50                         return v1 + v2;
51                     }
52 
53                 },
54 
55                 new Function2<Integer, Integer, Integer>() {
56                     @Override
57                     public Integer call(Integer v1, Integer v2)
58                             throws Exception {
59                         return v1 + v2;
60                     }
61 
62                 });
63 
64         List<Tuple2<String, Integer>> wordCountList = wordCounts.collect();
65         for(Tuple2<String, Integer> wordCount : wordCountList) {
66             System.out.println(wordCount);
67         }
68 
69         sc.close();
70     }
71 
72 }

输出:

(xuelisheng,1)
(spark,2)
(hadoop,1)
(scala,1)
(java,1)

  • cartesian算子

形成类似于sql中的笛卡尔积。

原文地址:https://www.cnblogs.com/xuelisheng/p/11506103.html