Spark实现销量统计

package com.mengyao.examples.spark.core;


import java.io.Serializable;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;


import scala.Tuple2;

/**
 * 国内乘用车4月、1-4月销量数据统计
 * @author mengyao
 *
 */
@SuppressWarnings("all")
public class CarSaleStatistics {

    static class Sale implements Serializable {
        private static final long serialVersionUID = -5393067134730174480L;
        //排名
        private int no;
        //车型
        private String model;
        //车企
        private String brand;
        //4月销量
        private int fourSale;
        //1-4月累计销量
        private int totalSale;
        public Sale(int no, String model, String brand, int fourSale, int totalSale) {
            this.no = no;
            this.model = model;
            this.brand = brand;
            this.fourSale = fourSale;
            this.totalSale = totalSale;
        }
        public int getNo() {
            return no;
        }
        public void setNo(int no) {
            this.no = no;
        }
        public String getModel() {
            return model;
        }
        public void setModel(String model) {
            this.model = model;
        }
        public String getBrand() {
            return brand;
        }
        public void setBrand(String brand) {
            this.brand = brand;
        }
        public int getFourSale() {
            return fourSale;
        }
        public void setFourSale(int fourSale) {
            this.fourSale = fourSale;
        }
        public int getTotalSale() {
            return totalSale;
        }
        public void setTotalSale(int totalSale) {
            this.totalSale = totalSale;
        }
        @Override
        public String toString() {
            return no + "	" + model + "	" + brand + "	" + fourSale + "	" + totalSale;
        }
    }
    
    /**
     * 集群模式:spark-submit --class com.mengyao.examples.spark.core.CarSaleStatistics --master yarn --deploy-mode cluster --driver-memory 2048m --executor-memory 1024m --executor-cores 1 --queue default examples-0.0.1-SNAPSHOT.jar /data/carsales_data/2018.4-china-car-sales_volume.txt /data/carsales_data/statistics/
     * 本地模式:Run As > Java Application
     * @param args [in,out]
     */
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName(CarSaleStatistics.class.getName());
        if (null==args||args.length==0) {
            args = new String[]{"./src/main/resources/data/2018.4-china-car-sales_volume.txt", "D:/"};
            System.setProperty("hadoop.home.dir", "D:/softs/dev/apache/hadoop-2.7.5");
            conf.setMaster("local");
        }
        JavaSparkContext sc = new JavaSparkContext(conf);
        //中国市场合资、国产乘用车4月分销量数据
        JavaRDD<String> linesRDD = sc.textFile(args[0]);
        //按品牌分组
        JavaPairRDD<String, Sale> brandSalesRDD = linesRDD.mapToPair(new PairFunction<String, String, Sale>() {
            private static final long serialVersionUID = -3023653638555855696L;
            @Override
            public Tuple2<String, Sale> call(String line) throws Exception {
                String[] fields = line.split("	");
                Sale sale = new Sale(Integer.parseInt(fields[0]), fields[1], fields[2], Integer.parseInt(fields[3]), Integer.parseInt(fields[4]));
                return new Tuple2<String, Sale>(sale.getBrand(), sale);
            }
        });
        //同品牌4月总销量、1-4月总销量
        JavaPairRDD<String, Sale> brandTotalSalesRDD = brandSalesRDD.reduceByKey(new Function2<Sale, Sale, Sale>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Sale call(Sale item1, Sale item2) throws Exception {
                item2.setFourSale(item1.getFourSale()+item2.getFourSale());
                item2.setTotalSale(item1.getTotalSale()+item2.getTotalSale());
                item2.setModel(item1.getModel()+","+item2.getModel());
                return item2;
            }
        });
        //4月份销量排名,转换key为4月销量
        JavaPairRDD<Integer, Sale> fourSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
            private static final long serialVersionUID = 2012736852338064223L;
            @Override
            public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                return new Tuple2<Integer, Sale>(t._2.getFourSale(), t._2);
            }
        });
        //4月份销量排名降序
        JavaPairRDD<Integer, Sale> fourSaleRankDescRDD = fourSaleRankRDD.sortByKey(false);
        fourSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
            private static final long serialVersionUID = -8110929872210046547L;
            @Override
            public void call(Tuple2<Integer, Sale> t) throws Exception {
                Sale sale = t._2;
                System.out.println("==== 4月份销量排名:"+sale.getBrand()+" = "+sale.getFourSale());
            }
        });
        fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"fourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
        
        //1-4月份累计销量排名,转换key为1-4月销量
        JavaPairRDD<Integer, Sale> totalSaleRankRDD = brandTotalSalesRDD.mapToPair(new PairFunction<Tuple2<String,Sale>, Integer, Sale>() {
            private static final long serialVersionUID = 2012736852338064223L;
            @Override
            public Tuple2<Integer, Sale> call(Tuple2<String, Sale> t) throws Exception {
                return new Tuple2<Integer, Sale>(t._2.getTotalSale(), t._2);
            }
        });
        //1-4月份累计销量排名降序
        JavaPairRDD<Integer, Sale> totalSaleRankDescRDD = totalSaleRankRDD.sortByKey(false);
        totalSaleRankDescRDD.foreach(new VoidFunction<Tuple2<Integer,Sale>>() {
            private static final long serialVersionUID = -8110929872210046547L;
            @Override
            public void call(Tuple2<Integer, Sale> t) throws Exception {
                Sale sale = t._2;
                System.out.println("==== 1-4月份累计销量排名:"+sale.getBrand()+" = "+sale.getTotalSale());
            }
        });
        fourSaleRankDescRDD.saveAsNewAPIHadoopFile(args[1]+"oneTofourSaleRank", NullWritable.class, Text.class, TextOutputFormat.class);
        //关闭
        sc.close();
    }

}

查看HDP Spark的HistoryServer(IP,18081),如下图表示成功:

原文地址:https://www.cnblogs.com/mengyao/p/9235500.html