在JAVA应用中远程提交MapReduce程序至Hadoop集群运行

由于在单独的JAVA应用中,程序没有指明集群的一些配置信息,导致程序不知道集群的位置以及其他的一些信息,故首先在配置类中,即Configuration,需要指明集群的位置,配置代码如下:


Configuration conf = new Configuration(true);
conf.set("fs.default.name", "hdfs://master:9000");
conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name", "yarn");
conf.set("mapreduce.jobtracker.address", "master:9001");
conf.set("yarn.resourcemanager.hostname", "master");
conf.set("mapreduce.jobhistory.address", "master:10020");
ToolRunner.run(conf, new MatrixMP(), null);
上述内容主要是配置Hadoop时的部分配置内容,它指明了集群、HDFS、JOBTRACKER等的位置,最后通过ToolRunner.run()启动运行,其中MatrixMP为我们运行的MapReduce类。下述为其他可能会用到的:

conf.set("yarn.resourcemanager.admin.address", "master:8033");
conf.set("yarn.resourcemanager.address", "master:8032");
conf.set("yarn.resourcemanager.resource-tracker.address", "master:8036");
conf.set("yarn.resourcemanager.scheduler.address", "master:8030");
conf.set("mapreduce.jobhistory.webapp.address", "master:19888");
conf.set("yarn.application.classpath", "/home/hadoop/hadoop/etc/hadoop,"
+"/home/hadoop/hadoop/share/hadoop/common/*,"
+"/home/hadoop/hadoop/share/hadoop/common/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/hdfs/*,"
+"/home/hadoop/hadoop/share/hadoop/hdfs/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/mapreduce/*,"
+"/home/hadoop/hadoop/share/hadoop/mapreduce/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/yarn/*,"
+"/home/hadoop/hadoop/share/hadoop/yarn/lib/*");
conf.set("mapreduce.application.classpath", "/home/hadoop/hadoop/etc/hadoop,"
+"/home/hadoop/hadoop/share/hadoop/common/*,"
+"/home/hadoop/hadoop/share/hadoop/common/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/hdfs/*,"
+"/home/hadoop/hadoop/share/hadoop/hdfs/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/mapreduce/*,"
+"/home/hadoop/hadoop/share/hadoop/mapreduce/lib/*,"
+"/home/hadoop/hadoop/share/hadoop/yarn/*,"
+"/home/hadoop/hadoop/share/hadoop/yarn/lib/*");
在MatrixMP的run方法中,我们还需要额外调用下面的createTempJar(String root)方法,其作用是将class文件打包成Jar文件(在eclipse提交时用,在其他地方直接用代码return new File(System.getProperty("java.class.path"))返回将该程序打包成的Jar文件 ),并在生成Job之后执行 ((JobConf) job .getConfiguration()).setJar( jarFile .toString());注意,在打包到集群运行时,不要加这些代码(即hadoop jar XXX.jar时)。

public static File createTempJar(String root) throws IOException {
if (!new File(root).exists()) {
return new File(System.getProperty("java.class.path"));
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}

private static void createTempJarInner(JarOutputStream out, File f,
String base) throws IOException {
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl[i], base + fl[i].getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
在MatrixMP的run方法完整代码:

public int run(String[] args)throws IOException, ClassNotFoundException, InterruptedException{
File jarFile = createTempJar("bin");

Job job = new Job(getConf(), "MatrixMP");
job.setJarByClass(MatrixMP.class);

((JobConf) job.getConfiguration()).setJar(jarFile.toString());

FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/left"));
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/right"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/"+new Date().getTime()));
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(MyFileInputFormat.class);
job.setOutputFormatClass(MyMultiFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
在测试的时候,利用java jar XXX.jar远程提交比在Eclipse中利用run on hadoop或java application远程提交的时间 长3-5倍,未知其缘由以及解决方法。
上述内容参考:http://blog.csdn.net/fhx007/article/details/42050467

除了上述方法之外,还有一个更为简单的方法,直接将Hadoop集群中的配置文件core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml,log4j.properties(输出日志)放到项目的src下面,但除了上述配置不添加外仍然需要将class文件打包成jar文件,即run方法中的代码不变。


————————————————
版权声明:本文为CSDN博主「e小王同学V」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ping802363/article/details/78213292

原文地址:https://www.cnblogs.com/javalinux/p/15006183.html