Spark练习之wordcount,基于排序机制的wordcount

一、原理及其剖析

在这里插入图片描述

二、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>spark</groupId>
    <artifactId>com.spark</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>SparkTest</name>
    <url>http://maven.apache.org</url>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.10.0</scala.version>
        <spark.version>1.3.0</spark.version>
        <hadoop.version>2.6.4</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- scala依赖-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- spark依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- hivecontext要用这个依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- hadoop依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

    </dependencies>
    <build>
        <pluginManagement>
            <plugins>
                <!-- 编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 编译java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

**备注:此处会遇到坑,一定要将scala和spark的版本对应好,如果版本不对应,一个过高或一个过低,会导致程序无法正常运行。

三、使用Java进行spark的wordcount练习

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;

/**
 * 使用Java开发本地测试的wordcount程序
 */
public class wordCount {

    public static void main(String[] args) {
        //编写Spark应用程序
        //本地执行,直接在main方法执行

        //第一步:创建SparkConf对象,设置Spark应用的配置信息
        //使用setMaster()可以设置Spark应用程序要连接的Spark集群的master节点的url
        //但是如果设置为local则代表在本地运行
        SparkConf conf = new SparkConf()
                .setAppName("workCount")
                .setMaster("local");

        //第二步:创建JavaSparkContext对象
        //在Spark中,SparkContext是Spark所有功能的一个入口,你无论是用java,scala,甚至是python编写
        //都必须要有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,
        //包括调度器(ADGSchedule、TaskScheduler),还会去到Spark Master节点上进行注册,等等。

        //SparkContext,是Spark应用中,最重要的一个对象
        //在spark中,编写不同类型的Spark应用程序,使用的SparkContext是不同的,
        // 如果使用scala,使用的就是原生的SparkContext对象
        //如果使用Java,那么就是JavaSparkContext对象
        //如果是开发Spark SQL程序,那么就是SQLContext、HiveContext
        //如果是Spark Streaming程序,那么就是它独有的SparkContext
        //以此类推

        JavaSparkContext sc = new JavaSparkContext(conf);

        //第三步:要针对输入源(hdfs文件、本地文件,等等),创建一个初始的RDD。
        //输入源中的数据会打散,分配到RDD的每个partition中,从而形成一个初始的分布式的数据集
        //这里,进行本地测试,所以针对本地文件
        //SparkContext中,用于根据文件类型的输入源创建RDD的方法,叫做textFile方法
        //在Java中,创建的普通RDD,都叫做JavaRDD
        //RDD中,有元素这种概念,如果是hdfs活着本地文件呢,创建的RDD,每一个元素就相当于是文件里的一行
        JavaRDD<String> lines = sc.textFile("C://Users//10902//Desktop//spark.txt");

        //第四步:对初始RDD进行transformation操作,也就是一些计算操作
        //通常操作会通过创建function,并配合RDD的map、flagMap等算子来执行
        //function,通常,如果比较简单,则创建指定Function的匿名内部类
        //但是如果function比较复杂,则会单独创建一个类,作为实现这个function接口的类


        //先将每一行拆分成单个的单词
        //FlatMapFunction,有两个泛型参数,分别代表了输入和输出类型
        //对于我本地的测试文件,输入肯定是String,因为是一行一行的文本,输出,也是String,因为是每一行的文本
        //flagMap:其实就是,将RDD的一个元素,给拆分成一个或多个元素
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        //接着,需要将每一个单词,映射为(单词,1)的这种格式
        //因为只有这样,才能根据单词作为key,来进行每个单词的出现次数的累加
        //mapToPair,其实就是将每个元素,映射为一个(v1,v2)这样的Tuple2类型的元素
        //tuple2是scala类型,包含了两个值
        //mapToPair这个算子,要求的是与PairFunction配合使用,第一个泛型参数代表了输入类型
        //第二个和第三个泛型参数,代表的输入的Tuple2的第一个值和第二个值的类型
        //JavaPairRdd的两个泛型参数,分别代表了tuple元素的第一个值和第二个值的类型
        JavaPairRDD<String, Integer> pairs = words.mapToPair(

                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                }
        );

        //接着,需要以单词作为key,统计每个单词出现的次数
        //这里要使用reduceByKey这个算子,对每个key对应的value,都进行reduce操作
        //比如JavaPairRdd中有个元素,分别为(hello,1)(hello,1)(hello,1)(world,1)
        //reduce操作,相当于是把第一个值和第二个值进行计算,然后再将结果与第三个值进行计算
        //比如这里的hello,那么就相当于是,首先1 + 1 = 2 ,然后再将2 + 1 = 3
        //最后返回的JavaPairRDD中的元素,也是tuple,但是第一个值就是每个key,第二个值就是key的value
        //reduce之后的结果,相当于就是每个单词出现的次数
        JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
                new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Integer call(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }
        );

        //到这里为止,我们通过几个spark算子操作,以及统计出了单词的次数
        //但是,之前我们使用的flatMap、mapToPair、reduceByKey这种操作,都叫做transformation操作
        //一个spark应用中,光是有transformation操作,是不行的,是不会执行的,必须要有一种叫做action
        //最后,使用一种叫做action操作的,比如说,foreach,来触发程序的执行
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1 + " 出现了" + wordCount._2 + " 次");
            }
        });

        sc.close();
    }

}

四、使用scala进行spark的wordcount练习


import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    /**
      * 第一步:创建Spark的配置对象SparkConf,设置Spark程序运行时的配置信息,
      * 例如说通过设置setMaster来设置程序要链接的Spark集群的Master的URL,
      * 如果设置为local,则代表Spark程序在本地运行。
      */
    val conf = new SparkConf //创建SparkConf对象
    conf.setAppName("WordCount") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群

    /**
      * 第二步:创建SparkContext对象
      * SparkContext是Spark程序所有功能的唯一入口,无论是采用scala、java、Python,R等都
      * 必须有一个SparkContext。SparkContext核心作用:初始化Spark应用程序运行所需要的核心组件,包括
      * DAGScheduler,TaskScheduler、SchedulerBackend同时还会负责Spark程序往Master注册程序等。
      * SparkContext是这个Spark程序中最为至关重要的一个对象。
      */
    val sc = new SparkContext(conf)

    /**
      * 第三步:根据具体的数据源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext创建RDD。
      * RDD的创建方式有三种:根据外部的数据源(HDFS)、根据Scala集合、其他的RDD操作。数据会被RDD划分成一系列的
      * Partitions,分配到每个Partition的数据属于一个Task的处理范畴
      */
    val lines = sc.textFile("C://Users//10902//Desktop//spark.txt", 1) //

    /**
      * 第四步:对初始化的RDD进行Transformation级别处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算。
      */
    /**
      * 4.1、对每一行的字符串拆分成单个的单词
      */
    val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分并把所有行的拆分结果通过flat合并成为一

    /**
      * 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word => (word,1)
      */
    val pairs = words.map { word => (word, 1) }

    /**
      * 4.3、在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数
      */
    val wordCounts = pairs.reduceByKey(_ + _) //对相同的key,进行value的累计

    wordCounts.foreach(map => println(map._1 + ":" + map._2))

    sc.stop()
  }
}

五、基于排序机制的wordcount

import org.apache.spark.{SparkConf, SparkContext}

object ScalaSortWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf //创建SparkConf对象
    conf.setAppName("ScalaSortWordCount") //设置应用程序的名称,在程序运行的监控界面可以看到名称
    conf.setMaster("local") //此时,程序在本地运行,不需要安装Spark集群

    val sc = new SparkContext(conf)
    val lines = sc.textFile("C://Users//xxx//Desktop//spark.txt", 1)
    val words = lines.flatMap { line => line.split("") }
    val pairs = words.map { word => (word, 1) }
    val wordCounts = pairs.reduceByKey(_ + _)

    val countWords = wordCounts.map { wordCount => (wordCount._2, wordCount._1) }
    val sortedCountWords = countWords.sortByKey(false)
    val sortedWordCounts = sortedCountWords.map { sortedCountWord => (sortedCountWord._2, sortedCountWord._1) }

    sortedWordCounts.foreach(sortWordCount => println(sortWordCount._1 + "出现了:" + sortWordCount._2 + "次"))
  }
}
原文地址:https://www.cnblogs.com/aixing/p/13327444.html