Hadoop1.2.0开发笔记(八)

本人一贯的风格是先了解系统的基础部分,然后在深入到高级部分;如果违背这种循序渐进的次序,也超出了本人的接受能力。古人说,学有本末,事有终始,知所先后,则尽道矣。我们还是从基础开始吧(本人上文提到的开发图片服务器还是放到后面吧)

本人在第一篇文章中描述的WordCount单词统计程序是在单机环境运行的,现在我们改造一下,改造成在单机伪分布环境中运行

新建WordCount类,继承Configured,实现Tool接口

public class WordCount extends Configured implements Tool{

    public static class Map extends Mapper<Object, Text, Text, IntWritable> {
        
        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        public void map(Object key, Text value, Context context

                        ) throws IOException, InterruptedException {

          StringTokenizer itr = new StringTokenizer(value.toString());
          String str=null;
          while (itr.hasMoreTokens()) {
            str=itr.nextToken();
            
            word.set(str);

            context.write(word, one);

          }
        }
    }
    
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
            result.set(sum);  
            context.write(key, result);  
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        
        File jarFile = EJob.createTempJar("bin");

        EJob.addClasspath("/usr/hadoop/conf");

        ClassLoader classLoader = EJob.getClassLoader();

        Thread.currentThread().setContextClassLoader(classLoader);
        

        /** 创建一个job,起个名字以便跟踪查看任务执行情况 **/
        Job job = new Job(getConf());
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); 

        /**
         * 当在hadoop集群上运行作业时,需要把代码打包成一个jar文件(hadoop会在集群分发这个文件),
         * 通过job的setJarByClass设置一个类,hadoop根据这个类找到所在的jar文件
         **/

        job.setJarByClass(WordCount.class);
        job.setJobName("wordcount");
        
        /**
         * 设置map和reduce函数的输入类型,这里没有代码是因为我们使用默认的TextInputFormat,针对文本文件,按行将文本文件切割成
         * InputSplits, 并用 LineRecordReader 将 InputSplit 解析成 <key,value&gt:
         * 对,key 是行在文件中的位置,value 是文件中的一行
         **/

        /** 设置map和reduce函数的输出键和输出值类型 **/

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);
        

        /** 设置要使用的map、combiner、reduce类型 **/

        job.setMapperClass(Map.class);

        job.setCombinerClass(Reduce.class);

        job.setReducerClass(Reduce.class);

       

        /** 设置输入和输出路径 **/

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        /** 提交作业并等待它完成 **/

        //System.exit(job.waitForCompletion(true) ? 0 : 1);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        //hdfs/localhost:9000
        String[] arg={"/test/input","/test/output"};
        int ret=ToolRunner.run(new WordCount(), arg);
        //int ret2=ToolRunner.run(conf, tool, args);
        System.exit(ret);
    }

}

因为本人是在伪分布环境测试上面的单词统计程序,需要将该类打包成jar文件,本人这里采用程序中生成临时jar文件的方式

 public class EJob {
 
     // To declare global field
     private static List<URL> classPath = new ArrayList<URL>();
 
     // To declare method
     public static File createTempJar(String root) throws IOException {
         if (!new File(root).exists()) {
             return null;
         }
         Manifest manifest = new Manifest();
         manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
         final File jarFile = File.createTempFile("EJob-", ".jar", new File(
                 System.getProperty("java.io.tmpdir")));
 
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 jarFile.delete();
             }
         });
 
         JarOutputStream out = new JarOutputStream(
                 new FileOutputStream(jarFile), manifest);
         createTempJarInner(out, new File(root), "");
         out.flush();
         out.close();
         return jarFile;
     }
 
     private static void createTempJarInner(JarOutputStream out, File f,
             String base) throws IOException {
         if (f.isDirectory()) {
             File[] fl = f.listFiles();
             if (base.length() > 0) {
                 base = base + "/";
             }
             for (int i = 0; i < fl.length; i++) {
                 createTempJarInner(out, fl[i], base + fl[i].getName());
             }
         } else {
             out.putNextEntry(new JarEntry(base));
             FileInputStream in = new FileInputStream(f);
             byte[] buffer = new byte[1024];
             int n = in.read(buffer);
             while (n != -1) {
                 out.write(buffer, 0, n);
                 n = in.read(buffer);
             }
             in.close();
         }
     }
 
     public static ClassLoader getClassLoader() {
         ClassLoader parent = Thread.currentThread().getContextClassLoader();
         if (parent == null) {
             parent = EJob.class.getClassLoader();
         }
         if (parent == null) {
             parent = ClassLoader.getSystemClassLoader();
         }
         return new URLClassLoader(classPath.toArray(new URL[0]), parent);
     }
 
     public static void addClasspath(String component) {
 
         if ((component != null) && (component.length() > 0)) {
             try {
                 File f = new File(component);
 
                 if (f.exists()) {
                     URL key = f.getCanonicalFile().toURL();
                     if (!classPath.contains(key)) {
                         classPath.add(key);
                     }
                 }
             } catch (IOException e) {
             }
         }
     }
 
 }

最后我们运行上面的WordCount类的main方法,记住先要将待统计的文件上传到HDFS文件系统的/test/input目录里面(可以采用本人上文中的编程方式上传或者在eclipse的UI界面上传) 

---------------------------------------------------------------------------  

本系列Hadoop1.2.0开发笔记系本人原创  

转载请注明出处 博客园 刺猬的温驯 

本文链接 http://www.cnblogs.com/chenying99/archive/2013/06/02/3113474.html

原文地址:https://www.cnblogs.com/chenying99/p/3113474.html