让mapreduce任务在远程集群上运行

一、编写好map和reduce方法。

二、下载集群上的core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml四个文件并放到src根目录下。

三、编写驱动程序,然后在获取Job对象之前,添加以下代码:

conf.set("mapreduce.app-submission.cross-platform", "true");
也可以在mapred-site.xml中将mapreduce.app-submission.cross-platform属性设置成true。

四、在获取job对象后,添加以下代码:

job.setJar("wc.jar");//wc.jar为接下来要打包成jar包的文件名
五、将此工程导出jar包,并且jar包名为wc.jar,必须和第三步中的名称一致。然后将该jar包方法工程根目录下。

六、直接在java程序中,右键选择run on hadoop即可。

我的java程序代码如下:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
public IntWritable one = new IntWritable(1);

@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer st = new StringTokenizer(value.toString());
Text text = new Text();
while (st.hasMoreTokens()) {
text.set(st.nextToken());
context.write(text, one);
}
}
}

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
conf.set("mapreduce.app-submission.cross-platform", "true");
Job job = Job.getInstance(conf, "word count");
job.setJar("wc.jar");
job.setJarByClass(WordCount.class);
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
Path outPath = new Path("/output");
Path inPath = new Path("/sweeney");
if (fs.exists(outPath)) {//避免出现文件已存在的情况
fs.delete(outPath, true);
}
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
————————————————
版权声明:本文为CSDN博主「SweeneyZuo」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Sweeneyzuo/article/details/83315417

原文地址:https://www.cnblogs.com/javalinux/p/14927030.html