Spark(一)wordcount

Spark(一)wordcount

一、新建一个scala项目

在maven中导入

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.4</version>
</dependency>

编写Scala文件

package com.littlepage.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf
    conf.setAppName("wordcount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    //弹性分布数据集
    //DATASET
    val fileRDD:RDD[String] = sc.textFile("data/testdata.txt")//行元素
    val words:RDD[String] = fileRDD.flatMap((x:String)=>{ x.split(" ") })//扁平化处理
    val pairword:RDD[(String,Int)] = words.map((x:String)=>{new Tuple2(x,1)})
    val res:RDD[(String,Int)] = pairword.reduceByKey((x:Int,y:Int)=>{x+y})
    res.foreach(println)//这步开始计算
  }
}

执行结果

Java版本的

package com.littlepage.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCount {
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.setAppName("java-wordcount");
        conf.setMaster("local");

        JavaSparkContext jsc=new JavaSparkContext(conf);
        JavaRDD<String> fileRDD=jsc.textFile("data/testdata.txt");
        JavaRDD<String> words = fileRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });
        JavaPairRDD<String, Integer> pairword = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });

        JavaPairRDD<String, Integer> res = pairword.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer oldv, Integer v) throws Exception {
                return oldv + v;
            }
        });
        res.foreach((a)->{
            System.out.println(a);
        });
    }
}

运行结果:

scala精简代码


package com.littlepage.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf
    conf.setAppName("wordcount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    sc.textFile("data/testdata.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
  }
}
原文地址:https://www.cnblogs.com/littlepage/p/11649856.html