spark算法

workcount

 1 package examples.wordcount;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.api.java.function.Function2;
 6 import org.apache.spark.api.java.function.PairFunction;
 7 import org.apache.spark.api.java.function.VoidFunction;
 8 import scala.Tuple2;
 9 
10 import java.util.*;
11 public class WordCount {
12     public static void main(String[] args) {
13         SparkConf sparkConf = new SparkConf();
14         sparkConf.setAppName("wordcount").setMaster("local[*]");
15         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
16         List<String> list = Arrays.asList("hello","hello","world");
17         jsc.parallelize(list,2).mapToPair(new PairFunction<String, String, Integer>() {
18             @Override
19             public Tuple2 call(String obj) {
20                 return new Tuple2<String,Integer>(obj,new Integer(1));
21             }
22         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
23             @Override
24             public Integer call(Integer i1,Integer i2) {
25                 return i1+i2;
26             }
27         }).sortByKey(false).foreach(new VoidFunction<Tuple2<String, Integer>>() {
28             @Override
29             public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
30                 System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2);
31             }
32         });
33     }
34 }

top-k

 1 package examples.topk;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.api.java.function.Function;
 6 import org.apache.spark.api.java.function.Function2;
 7 import org.apache.spark.api.java.function.PairFunction;
 8 import scala.Serializable;
 9 import scala.Tuple2;
10 
11 import java.util.Arrays;
12 import java.util.Comparator;
13 import java.util.List;
14 
15 public class TopK {
16     public static void main(String[] args) {
17         SparkConf sparkConf = new SparkConf();
18         sparkConf.setMaster("local[*]").setAppName("Top-k");
19         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
20         List<String> list = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "a", "b", "c", "d", "e", "f", "a", "b", "c", "d", "e", "a", "b", "c", "d");
21         List<Tuple2<String, Integer>> topList = jsc.parallelize(list, 5).mapToPair(new PairFunction<String, String, Integer>() {
22             @Override
23             public Tuple2 call(String key) {
24                 return new Tuple2<String, Integer>(key, new Integer(1));
25             }
26         }).reduceByKey(new Function2<Integer, Integer, Integer>() {
27             @Override
28             public Integer call(Integer integer, Integer integer2) throws Exception {
29                 return integer + integer2;
30             }
31         }).filter(new Function<Tuple2<String, Integer>, Boolean>(){
32             @Override
33             public Boolean call(Tuple2<String,Integer> tuple2) {
34                 return tuple2._1!=null;
35             }
36 
37         }).top(5,new TupleComparator());
38         System.out.println();
39         for (Tuple2<String, Integer> tuple2 : topList) {
40             System.out.println(tuple2._1 + ":" + tuple2._2);
41         }
42     }
43     //top算子和takeOrdered算子的比较器结果刚好相反
44     public static class TupleComparator implements
45             Comparator<Tuple2<String, Integer>>, Serializable {
46 
47         private static final long serialVersionUID = 1L;
48 
49         @Override
50         public int compare(Tuple2<String, Integer> o1,
51                            Tuple2<String, Integer> o2) {
52             return Integer.compare(o1._2(), o2._2());
53         }
54     }
55 
56 
57 }
top-k

pagerank

 1 package examples.pagerank;
 2 
 3 import org.apache.spark.api.java.JavaPairRDD;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.api.java.JavaRDD;
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.function.Function;
 8 import org.apache.spark.api.java.function.Function2;
 9 import org.apache.spark.api.java.function.PairFunction;
10 import org.apache.spark.api.java.function.PairFlatMapFunction;
11 import scala.Tuple2;
12 
13 import java.util.*;
14 
15 public class PageRank{
16     public static void main(String[] args){
17         SparkConf conf=new SparkConf();
18         conf.setAppName("pagerank");
19         conf.setMaster("local");
20         conf.set("spark.testing.memory", "500000000");//设置运行内存大小
21         JavaSparkContext sc=new JavaSparkContext(conf);
22         //partitionBy()只对kv RDD起作用, 进行该操作后,将相同key值的数据放到同一机器上,并进行持久化操作,对后续循环中的join操作进行优化,使得省去join操作 shuffle的开销
23         ArrayList<String> list=new ArrayList<String>(4);
24         list.add("A,D");//网页之间的连接关系,A页面链接到网页D
25         list.add("B,A");
26         list.add("C,A,B");
27         list.add("D,A,C");
28         JavaRDD<String> links=sc.parallelize(list);
29         JavaPairRDD<Character,char[]> pairs =links.mapToPair(new PairFunction<String, Character, char[]>() {
30             public Tuple2<Character,char[]> call(String s) {
31                 String[] str=s.split(",");
32                 char[] ch=new char[str.length];
33                 for (int i=0;i<str.length;i++){
34                     ch[i]=str[i].charAt(0);
35                 }
36                 return new Tuple2<Character,char[]>(s.charAt(0),ch );
37             }//将字符串中保存的页面的链接关系map转换成key-values形式,key当前页面,指向的页面集合用数组表示
38         }).cache();//持久化
39         JavaPairRDD<Character,Float> ranks=sc.parallelize(Arrays.asList('A','B','C','D')).mapToPair(new PairFunction<Character, Character, Float>() {
40             public Tuple2<Character,Float> call(Character character) throws Exception {
41                 return new Tuple2<Character,Float>(character,new Float(1.0));
42             }//初始化页面权值是1.0
43         });
44         for(int i=0;i<10;i++){
45             JavaPairRDD<Character,Tuple2<char[],Float>> contribs=pairs.join(ranks);
46             JavaPairRDD<Character,Float> con=contribs.flatMapToPair(new PairFlatMapFunction<Tuple2<Character,Tuple2<char[],Float>>,Character,Float>(){
47                 public Iterable call(Tuple2<Character,Tuple2<char[],Float>> val) throws Exception{
48                     List<Tuple2<Character,Float>> list=new ArrayList<Tuple2<Character, Float>>();
49                     Float f=val._2._2;
50                     char[] ch=val._2._1();
51                     int len=ch.length;
52                     for(int i=0;i<len;i++) {
53                         Tuple2<Character, Float> map = new Tuple2<Character, Float>(new Character(ch[i]), new Float(f / len));
54                         list.add(map);
55                     }
56                     return list;
57                 }
58             });//将每个页面获得其他页面的pagerank值形成键值对的形式
59             ranks=con.reduceByKey(new Function2<Float, Float, Float>() {
60                 public Float call(Float a, Float b) { return a + b; }
61             }).mapValues(new Function<Float, Float>() {
62                 public Float call(Float a) throws Exception {
63                     return new Float(0.15+0.85*a);
64                 }
65             });//当前迭代的pagerank计算
66         }
67         List<Tuple2<Character,Float>> retList = ranks.collect();
68         for(Tuple2<Character,Float> tuple2 :retList) {
69             System.out.println(tuple2._1+":"+tuple2._2);
70         }
71     }
72 }
pagerank
原文地址:https://www.cnblogs.com/carsonwuu/p/11634215.html