spark_入门(单词统计)

1、特点

  1. 快如闪电的集群计算:是Hadoop的100倍,磁盘计算快10倍
  2. 大规模快速通用的计算引擎:支持Java/scala/python/R    提供80+种操作符,容易构建并行应用  组合SQL   流计算  复杂分析
  3. 运行环境:Hadoop mesos,standalone等

2、spark模块

  1. spark core 核心模块
  2. spark SQL 
  3. spark streaming  流计算
  4. spark MLlib       机器学习
  5. spark graph    图计算

3、spark安装

  1. 配置环境变量的时候记得sbin和bin都要配置

4、spark启动

  1. 进入bin目录下,启动spark-shell

5、spark编译

  1. sc  :sparkcontext   spark程序的入口点,封装了所有的方法
  2. RDD:弹性分布式数据集

6、spark入门案例--单词统计

scala> val rdd1 = sc.textFile("/home/test.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/test.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> val rdd2 = rdd1.flatMap(line => line.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> val rdd3 = rdd2.ma
map   mapPartitions   mapPartitionsWithIndex   max

scala> val rdd3 = rdd2.map(word => (word,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28

scala> val rdd4 = rdd3.reduce
reduce   reduceByKey   reduceByKeyLocally

scala> val rdd4 = rdd3.reduceByKey
reduceByKey   reduceByKeyLocally

scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30

scala> rdd4.collect()
res0: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))

现在我们将上述代码简化:
scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res3: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))

7、单词过滤

scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_+_).collect
res4: Array[(String, Int)] = Array((world2,1), (world1,1), (world3,1))

filter:是选出需要的

8、利用intellij运行spark进行单词统计-Scala

首先先记录一下遇到的问题:is already defined as object -----》原因是由于在source root的时候直接将整个项目进行了source,导致出现重复定义object

下面附上代码:
import org.apache.spark.{SparkConf, SparkContext}

object Demo111 {
  def main(args: Array[String]): Unit = {

    /**
      * sparkConf介绍
      */
    /*
     /** Create a SparkConf that loads defaults from system properties and the classpath */
        def this() = this(true)
    */

    /**
      * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
      *
      * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
      * values from any `spark.*` Java system properties set in your application as well. In this case,
      * parameters you set directly on the `SparkConf` object take priority over system properties.
      *
      * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
      * get the same configuration no matter what the system properties are.
      *
      * All setter methods in this class support chaining. For example, you can write
      * `new SparkConf().setMaster("local").setAppName("My app")`.
      *
      * @param loadDefaults whether to also load values from Java system properties
      *
      * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
      * by the user. Spark does not support modifying the configuration at runtime.
      */

    //创建一个spark configure配置对象
    val conf = new SparkConf();
    conf.setAppName("wordCount");
    //设置master属性
    conf.setMaster("local");

    //通过conf创建spark环境变量
    val sc = new SparkContext(conf);
    //开始wordcount
    /**
      * textFile()函数
      * Read a text file from HDFS, a local file system (available on all nodes), or any
      * Hadoop-supported file system URI, and return it as an RDD of Strings.
      * @param path path to the text file on a supported file system
      * @param minPartitions suggested minimum number of partitions for the resulting RDD
      * @return RDD of lines of the text file
      */
    //rdd1按照行数存储
    val rdd1 = sc.textFile("D:\test.txt");
    //System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
    /**flatMap()函数
      *  Return a new RDD by first applying a function to all elements of this
      *  RDD, and then flattening the results.
      */
    //按照空格分隔
    val rdd2 = rdd1.flatMap(_.split(""))
    //    println(rdd2)
    //初始化所有的单词出现的次数为1
    val rdd3 = rdd2.map((_,1))
    /**
      * Merge the values for each key using an associative and commutative reduce function. This will
      * also perform the merging locally on each mapper before sending results to a reducer, similarly
      * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
      */
    //根据key进行聚合统计
    val rdd4 = rdd3.reduceByKey(_ + _);
    /**
      * Return an array that contains all of the elements in this RDD.
      *
      * @note This method should only be used if the resulting array is expected to be small, as
      * all the data is loaded into the driver's memory.
      */
    //将rdd转为列表
    val result = rdd4.collect()
    for (i <- result) println(i)

  }

}



运行的结果却是统计了每个字母的出现的次数
(d,1)
(w,1)
(e,1)
(h,1)
( ,1)
(o,2)
(r,1)
(l,3)

那么断定出现问题的地方应该是分隔单词的时候出错
split(" ")中间少写了一个空格,否则就是分隔为一个一个的字母
import org.apache.spark.{SparkConf, SparkContext}

object Demo111 {
  def main(args: Array[String]): Unit = {

    /**
      * sparkConf介绍
      */
    /*
     /** Create a SparkConf that loads defaults from system properties and the classpath */
        def this() = this(true)
    */

    /**
      * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
      *
      * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
      * values from any `spark.*` Java system properties set in your application as well. In this case,
      * parameters you set directly on the `SparkConf` object take priority over system properties.
      *
      * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
      * get the same configuration no matter what the system properties are.
      *
      * All setter methods in this class support chaining. For example, you can write
      * `new SparkConf().setMaster("local").setAppName("My app")`.
      *
      * @param loadDefaults whether to also load values from Java system properties
      *
      * @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
      * by the user. Spark does not support modifying the configuration at runtime.
      */

    //创建一个spark configure配置对象
    val conf = new SparkConf();
    conf.setAppName("wordCount");
    //设置master属性
    conf.setMaster("local");

    //通过conf创建spark环境变量
    val sc = new SparkContext(conf);
    //开始wordcount
    /**
      * textFile()函数
      * Read a text file from HDFS, a local file system (available on all nodes), or any
      * Hadoop-supported file system URI, and return it as an RDD of Strings.
      * @param path path to the text file on a supported file system
      * @param minPartitions suggested minimum number of partitions for the resulting RDD
      * @return RDD of lines of the text file
      */
    //rdd1按照行数存储
    val rdd1 = sc.textFile("D:\test.txt");
    //System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
    /**flatMap()函数
      *  Return a new RDD by first applying a function to all elements of this
      *  RDD, and then flattening the results.
      */
    //按照空格分隔
    val rdd2 = rdd1.flatMap(line =>line.split(" "));
    //    println(rdd2)
    //初始化所有的单词出现的次数为1
    val rdd3 = rdd2.map((word => (word,1)));
    /**
      * Merge the values for each key using an associative and commutative reduce function. This will
      * also perform the merging locally on each mapper before sending results to a reducer, similarly
      * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
      */
    //根据key进行聚合统计
    val rdd4 = rdd3.reduceByKey(_ + _);
    /**
      * Return an array that contains all of the elements in this RDD.
      *
      * @note This method should only be used if the resulting array is expected to be small, as
      * all the data is loaded into the driver's memory.
      */
    //将rdd转为列表
    val result = rdd4.collect()
    for (i <- result) println(i) // 等价于result.foreach(println);
  }

}


(hello,1)
(world,1)

还有一个很奇怪的问题是,我必须要新建一个Java项目来引入Scala,才可以引入spark,如果直接新建一个Scala项目,就不可以引入

还有一个小地方需要记住的是:
  //通过conf创建spark环境变量
    val sc = new SparkContext(conf);
这里的spark的环境变量是通过spark配置对象conf来创建的,所以一定要跟参数conf


9、java版的spark单词统计
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class wordcount {
    public static void main(String[] args) {

        //同样的需要创建一个环境变量的对象conf
        SparkConf conf = new SparkConf();
        conf.setAppName("wordcount_java");
        conf.setMaster("local");
        //创建一个Java spark环境变量
        JavaSparkContext sc = new JavaSparkContext(conf);
        //处理文本,得到按照行数分隔的文本
        JavaRDD rdd1 =  sc.textFile("D:\test.txt");
        /**FlatMapFunction()函数返回一个迭代器
         * @FunctionalInterface
        public interface FlatMapFunction<T, R> extends Serializable {
        Iterator<R> call(T t) throws Exception;
        }

         */
        //对每行按照空格进行切割 ,并且压扁,返回一个集合列表的迭代器
       JavaRDD<String> rdd2 =  rdd1.flatMap(new FlatMapFunction() {
           @Override
           public Iterator call(Object o) throws Exception {
               String s = o.toString();
               String[] arr =  s.split(" ");
               List<String> list = new ArrayList<String>();
               for(String ss: arr){
                   list.add(ss);
               }
               return list.iterator();
           }
       });

        /**
         * @FunctionalInterface
        public interface PairFunction<T, K, V> extends Serializable {
        Tuple2<K, V> call(T t) throws Exception;
        }
         */
       //初始化每个单词word---(word,1)
      JavaPairRDD<String,Integer> rdd3 =  rdd2.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String,Integer>(s,1);
            }
        });
        /**
         *   def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
            fromRDD(reduceByKey(defaultPartitioner(rdd), func))
         }
         */

      //纵向捏合 reduce化简
        JavaPairRDD<String,Integer> rdd4 =  rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        /**
         *   /**
         * Return an array that contains all of the elements in this RDD.
         *
         * @note this method should only be used if the resulting array is expected to be small, as
         * all the data is loaded into the driver's memory.
         */
        //打印rdd4
        List<Tuple2<String,Integer>> list =  rdd4.collect();
        for(Tuple2<String,Integer> t : list){
             System.out.println(t._1 + "===" + t._2);
        }




    }
}






欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
原文地址:https://www.cnblogs.com/flyingcr/p/10327067.html