MapReduce

在4个服务器中启动4个map任务

每个map任务读取目标文件,每读一行就拆分一下单词,并记下来次单词出现了一次

目标文件的每一行都处理完成后,需要把单词进行排序

在3个服务器上启动reduce任务

每个reduce获取一部分map的处理结果

reduce任务进行汇总统计,输出最终的结果数据

MapReduce是一个非常优秀的编程模型,已经把绝大多数的工作做完了,我们只需要关心2个部分:

map处理逻辑——对传进来的一行数据如何处理?输出什么信息?

reduce处理逻辑——对传进来的map处理结果如何处理?输出什么信息?
编写好这两个核心业务逻辑之后,只需要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。

至于其它的复杂细节,例如如何启动map任务和reduce任务、如何读取文件、如对map结果排序、如何把map结果数据分配给reduce、reduce
如何把最终结果保存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自定义扩展配置,例如如何读文件、如何组织map或者reduce的输出结果等等,




package com.yuejiesong.MRAction;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//得到输入的每一行数据
String line = value.toString();
//通过空格分隔
String[] words = line.split(" ");
//循环遍历输出
for(String word:words){
context.write(new Text(word),new IntWritable(1));
}
}
}
--------------------------------------------------------------
package com.yuejiesong.MRAction;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer count = 0;
for(IntWritable value:values){
count += value.get();
}
context.write(key,new IntWritable(count));
}
}
-------------------------------------------------------------package com.yuejiesong.MRAction;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountMapReduce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置对象
Configuration conf = new Configuration();
//创建job对象
Job job = Job.getInstance(conf,"wordcount");
//设置运行的job类
job.setJarByClass(WordCountMapReduce.class);
//设置mapper类
job.setMapperClass(WordCountMapper.class);
//设置reducer类
job.setReducerClass(WordCountReducer.class);
//设置map输出的key value
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置reduce输出的key,value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println("wordcount task fail!");
}
}
}
--------------------------------------------------------------
在pom.xml目录下打包
mvn clean

mvn package

准备数据

(base) [root@pyspark ~]# hdfs dfs -ls /wc20200410input/
Found 4 items
-rw-r--r-- 1 root supergroup 20 2020-04-11 12:40 /wc20200410input/1.txt
-rw-r--r-- 1 root supergroup 14 2020-04-11 12:40 /wc20200410input/2.txt
-rw-r--r-- 1 root supergroup 21 2020-04-11 12:40 /wc20200410input/3.txt
-rw-r--r-- 1 root supergroup 23 2020-04-11 12:40 /wc20200410input/4.txt

hadoop jar HiveCountToolsUDF-1.0-SNAPSHOT.jar com.yuejiesong.MRAction.WordCountMapReduce /wc20200410input/ /wc20200410output

(base) [root@pyspark ~]# hdfs dfs -text /wc20200410output/part-r-00000
good 5
has 1
is 3
the 1
today 2
weather 3

从job提交到执行完成这个过程。

(1)客户端提交任务
Client提交任务时会先到HDFS中查看目标文件的大小,了解要获取的数据的规模,然后形成任务分配的规划,例如:
a.txt 0-128M交给一个task,128-256M 交给一个task,b.txt 0-128M交给一个task,128-256M交给一个task ...,形成规划文件job.split。
然后把规划文件job.split、jar、配置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配合适的服务器资源)

(2)启动appmaster
注:appmaster是本次job的主管,负责maptask和reducetask的启动、监控、协调管理工作。
yarn找一个合适的服务器来启动appmaster,并把job.split、jar、xml交给它。
(3)启动maptask
Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。
分配maptask时,会尽量让maptask在目标数据所在的datanode上执行。
(4)执行maptask
Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,
map调用context.write把处理结果写出去,保存到本机的一个结果文件,
这个文件中的内容是分区且有序的。
分区的作用就是定义哪些key在一组,一个分区对应一个reducer。
(5)启动reducetask
Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。
(6)执行reducetask
reducetask去读取maptask的结果文件中自己对应的那个分区数据,例如reducetask_01去读第一个分区中的数据。
reducetask把读到的数据按key组织好,传给reduce方法进行处理,处理结果写到指定的输出路径。

原文地址:https://www.cnblogs.com/songyuejie/p/12672810.html