大数据学习11_MapReduce案例实战(单词统计)

WordCount

需求: 在一堆给定的文本文件中统计输出每一个单词出现的总次数

Step 1. 数据格式准备

创建一个新的文件

cd /export/servers
vim wordcount.txt

向其中放入以下内容并保存

zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hello aleen hello nana hello city hello ciounty hello
zhangsan helllo lisi hello wangwu hello hello hello
zhaoliu zhousna hello hadoop

上传到 HDFS

hdfs dfs -mkdir /wordcount/
hdfs dfs -put wordcount.txt /wordcount/

  

 

 Step 2. 导入maven依赖

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>org.example</groupId>
 8     <artifactId>MapReduce_map</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11     <packaging>jar</packaging>
12     <dependencies>
13         <dependency>
14             <groupId>org.apache.hadoop</groupId>
15             <artifactId>hadoop-common</artifactId>
16             <version>2.7.5</version>
17         </dependency>
18         <dependency>
19             <groupId>org.apache.hadoop</groupId>
20             <artifactId>hadoop-client</artifactId>
21             <version>2.7.5</version>
22         </dependency>
23         <dependency>
24             <groupId>org.apache.hadoop</groupId>
25             <artifactId>hadoop-hdfs</artifactId>
26             <version>2.7.5</version>
27         </dependency>
28         <dependency>
29             <groupId>org.apache.hadoop</groupId>
30             <artifactId>hadoop-mapreduce-client-core</artifactId>
31             <version>2.7.5</version>
32         </dependency>
33         <dependency>
34             <groupId>junit</groupId>
35             <artifactId>junit</artifactId>
36             <version>RELEASE</version>
37         </dependency>
38     </dependencies>
39     <build>
40         <plugins>
41             <plugin>
42                 <groupId>org.apache.maven.plugins</groupId>
43                 <artifactId>maven-compiler-plugin</artifactId>
44                 <version>3.1</version>
45                 <configuration>
46                     <source>1.8</source>
47                     <target>1.8</target>
48                     <encoding>UTF-8</encoding>
49                     <!--    <verbal>true</verbal>-->
50                 </configuration>
51             </plugin>
52             <plugin>
53                 <groupId>org.apache.maven.plugins</groupId>
54                 <artifactId>maven-shade-plugin</artifactId>
55                 <version>2.4.3</version>
56                 <executions>
57                     <execution>
58                         <phase>package</phase>
59                         <goals>
60                             <goal>shade</goal>
61                         </goals>
62                         <configuration>
63                             <minimizeJar>true</minimizeJar>
64                         </configuration>
65                     </execution>
66                 </executions>
67             </plugin>
68 
69         </plugins>
70     </build>
71 
72 
73 </project>
maven依赖

 Step 3. Mapper

public class WordCountMapper extends Mapper<LongWritable,Text, Text, LongWritable> {
//map方法就是将K1和V1 转为 K2和V2
    /*
      参数:
         key    : K1   行偏移量
         value  : V1   每一行的文本数据
         context :表示上下文对象
     */
    /*
      如何将K1和V1 转为 K2和V2
        K1         V1
        0   hello,world,hadoop
        15  hdfs,hive,hello
       ---------------------------

        K2            V2
        hello         1
        world         1
        hdfs          1
        hadoop        1
        hello         1
     */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        LongWritable longWritable = new LongWritable();
        //1:将一行的文本数据进行拆分
        String[] split = value.toString().split(" ");

        //2:遍历数组,组装 K2 和 V2
        for (String word : split) {
            //3:将K2和V2写入上下文
            text.set(word);
            longWritable.set(1);
            context.write(text, longWritable);
        }

    }
}

  

Step 4. Reducer

public class WordCountReducer extends Reducer<Text, LongWritable, Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        //1:遍历集合,将集合中的数字相加,得到 V3
        for (LongWritable value : values) {
            count += value.get();
        }
        //2:将K3和V3写入上下文中
        context.write(key, new LongWritable(count));
    }

}

Step 5. 定义主类, 描述 Job 并提交 Job

public class JobMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //该方法用于指定一个job任务

            //1:创建一个job任务对象
            Job job = Job.getInstance(super.getConf(), "wordcount");
            //如果打包运行出错,则需要加该配置
            job.setJarByClass(JobMain.class);
            //2:配置job任务对象(八个步骤)

            //第一步:指定文件的读取方式和读取路径
            job.setInputFormatClass(TextInputFormat.class);
            TextInputFormat.addInputPath(job, new Path("hdfs://192.168.28.128:8020/wordcount"));

            //第二步:指定Map阶段的处理方式和数据类型
            job.setMapperClass(WordCountMapper.class);
            //设置Map阶段K2的类型
            job.setMapOutputKeyClass(Text.class);
            //设置Map阶段V2的类型
            job.setMapOutputValueClass(LongWritable.class);


            //第三,四,五,六 采用默认的方式

            //第七步:指定Reduce阶段的处理方式和数据类型
            job.setReducerClass(WordCountReducer.class);
            //设置K3的类型
            job.setOutputKeyClass(Text.class);
            //设置V3的类型
            job.setOutputValueClass(LongWritable.class);

            //第八步: 设置输出类型
            job.setOutputFormatClass(TextOutputFormat.class);
            //设置输出的路径
            Path path = new Path("hdfs://node01:8020/wordcount_out");
            TextOutputFormat.setOutputPath(job, path);
            //TextOutputFormat.setOutputPath(job, new Path("file:///D:\mapreduce\output"));

            //获取FileSystem
            FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:8020"), new Configuration());
            //判断目录是否存在
            boolean bl2 = fileSystem.exists(path);
            if (bl2) {
                //删除目标目录
                fileSystem.delete(path, true);
            }


            //等待任务结束
            boolean bl = job.waitForCompletion(true);

            return bl ? 0 : 1;

    }
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        //启动job任务
        int run = ToolRunner.run(configuration, new JobMain(), args);
        System.exit(run);

    }
}

Step 6. 进行MapReduce集群运行

  • 将java工程打包为jar包,首先要看一下maven依赖的setting.xml文件路径是否正确,如果不正确在设置里面修改setting文件路径。
  • 打包成功的话会显示:
  • 将jar包上传到Linux主机,并执行

Step 7. 查看运行结果

 统计结果如下:

原文地址:https://www.cnblogs.com/g414056667/p/13581083.html