MapReduce小试牛刀-WordCount实战

MapReduce是一种分布式计算模式,由Google提出,主要用于搜索领域,解决海量数据的计算问题,mapRedcure是分布式运行的,分为map阶段和Reduce阶段。
Map阶段是一个独立的程序,可在多个节点同时运行,每个节点处理一部分数据。
Reduce阶段也是一个独立的程序,可以再一个或者多个节点同时运行,每个节点处理一部分数据。
观摩笔记

WordCount实战

package com.imooc.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

/**
 * 需求:读取hdfs上的hello word.txt文件,计算文件中每个单词出现的总次数
 *
 * 原始文件hello.txt内容如下:
 * hello you
 * hello me
 *
 * 最终需要的结果形式如下:
 * hello 2
 * me 1
 * you 1
 *
 *
 * @author zhangshao
 * @date 2021/3/28 4:59 下午
 */
public class WordCountJob {
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        /**
         * 需要实现map函数,
         * 这个map函数就是可以接收<k1,v1>,产生<K2,v2>
         * @param k1 代表每一行数据的行首偏移量,
         * @param v1 v1代表的是每一行内容
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            String[] words = v1.toString().split(" ");
            for(String word:words){
                //把迭代出来的单词封装<k2,v2>的形式
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                //把<k2,v2>写出去
                context.write(k2,v2);
            }
        }
    }

    /**
     * reduce 阶段
     */
    public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable> {


        @Override
        public void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            //创建一个sum变量,保存v2s之和
            long sum = 0L;
            for(LongWritable v2:v2s){
                sum +=v2.get();
            }
            //组装k3,v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //把结果写出去
            context.write(k3,v3);
        }
    }

    /**
     * 组装jon = map+reduce
     */
    public static void main(String[] args) {
        try{

            if(args.length!=2){
                //如果传递的参数不够,程序直接退出
                System.exit(100);

            }

            //指定job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个job
            Job job = Job.getInstance(conf);
            //注意:这一行必须设置,否则在集群中执行的时候找不到WordCountJob这个类
            job.setJarByClass(WordCountJob.class);

            //指定输入路径(可以是文件,也可以是路径)
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //输出路径(只能指定一个不存在的路径)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));

            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);

            //指定reduce相关的类型
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);

            //提交job
            job.waitForCompletion(true);

        }catch (Exception e){
            e.printStackTrace();
        }

    }


}

主代码开发完毕后,需要打jar包到集群上去执行,需要在pom中添加maven的编译打包插件。

<build>
        <plugins>
            <!-- compiler插件, 设定JDK版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意:在添加完build相关编译依赖后,还需要在pom文件中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用该依赖,在执行及打包的时候不使用,因为hadoop-client和log4j依赖在集群中都是有的,打jar的时候不需要打进去。

<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.29</version>
            <scope>provided</scope>
        </dependency>
        <dependency>    
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.29</version>
            <scope>provided</scope>
        </dependency>

之后使用编译命令打包或idea中点击install

mvn clean package -DskipTests

随后将jar上传到集群中。
创建hello.txt文件

[root@bigdata01 ~]# vi hello.txt
hello you
hello me

接下来就可以向集群中提交MapReduce任务了。
具体命令如下:

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test/hello.txt /out
  • hadoop:表示使用hadoop脚本提交任务,其实这里也可以使用yarn,从hadoop2.x开始支持yarn,不过也兼容hadoop1
  • jar:表示执行jar包
  • db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:表示具体的jar包路径
  • com.imooc.mr.WordCountJob:表示要执行的mapreduce代码的全路径
  • /test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径。
  • /out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为每次执行任务都需要创建一个新的输出目录来存储结果数据。

查看任务执行状态

2021-04-03 09:56:53,269 INFO mapreduce.Job:  map 0% reduce 0%
2021-04-03 09:56:59,409 INFO mapreduce.Job:  map 100% reduce 0%
2021-04-03 09:57:05,462 INFO mapreduce.Job:  map 100% reduce 100%
2021-04-03 09:57:06,488 INFO mapreduce.Job: Job job_1617414772389_0001 completed successfully

也可以在web界面上查看执行情况。
访问地址:http://bigdata01:8088

查看执行结果

[root@bigdata01 ~]# hdfs dfs -ls /out/
Found 2 items
-rw-r--r--   1 root supergroup          0 2021-04-03 09:57 /out/_SUCCESS
-rw-r--r--   1 root supergroup         19 2021-04-03 09:57 /out/part-r-00000

_SUCCESS文件是一个标记文件,代表该任务执行成功。
此外,part后面的-r表示这个结果文件是由reduce步骤产生的,如果一个mapreduce只有map阶段没有reduce阶段,那么产生的结果文件是part-m-xxxx

[root@bigdata01 ~]# hdfs dfs -cat /out/part-r-00000
hello   2
me      1
you     1
原文地址:https://www.cnblogs.com/shine-rainbow/p/wordcountcount.html