hadoop编程实践

项目文件:Github

需求一:

package test.dataclean;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * @ author:Kouch
 * 
 *  “清洗”思路:
 *      1 map: 获取的一行数据;判断一行字符串长度;
 *      2 reduce:
 * 
 *  注:结合具体需求;
 */

public class DataHandle1 {
    
    //map
    public static class Map extends Mapper<Object,Text,Text,Text>{
        
        private static Text line=new Text();
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            line=value;
            //测试
            System.out.println("内容:"+line);
            
            //一行字符串长度;
            String str=line.toString();
            //System.out.println("zhuan:"+str);
            
            if(str.length()>20) {
                context.write(line, new Text(""));
            }
        }
    }
    
    //reduce
    public static class Reduce extends Reducer<Text,Text,Text,Text>{
        public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException {
            
            //测试
            //System.out.println("内容:"+key);
            context.write(key, new Text(""));
        }
    }
    
    
    //main
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        //配置类
        Configuration conf=new Configuration();
        conf.set("mapred.job.tracker", "localhost:9000");
        
        //获取传参
        //方式一:
        String[] ioArgs=new String[] {"input/dailydata1.txt","out"};
        String[] otherArgs=new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
        if(otherArgs.length!=2) {
            System.err.println("Usage:Data Clean <in> <out> - path?");
            System.exit(2);
        }
        
        //判断输出文件是否存在;存在-删除;
        String url="hdfs://localhost:9000/user/kouch/"+ioArgs[1];
        FileSystem fs=FileSystem.get(URI.create(url), conf);
        if(fs.delete(new Path(url), true)) {//true:文件夹下所有文件;false:如果此文件存在其他文件就不删除
            System.out.println("删除"+url);
        }
        
        //Job设置
        Job job=Job.getInstance();
        job.setJarByClass(Deduplication.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        //设置输入输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        
        //等待job完成之后再返回结果并退出程序
        System.exit(job.waitForCompletion(true)?0:1);
        
    }
    
    
}

需求二:

package test.dataclean;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/*
 * @ author:Kouch
 * 
 *  “清洗”思路:
 *      1 map: 获取的一行数据;去除错误数据;截取有效字段;输入context;
 *      2 reduce:
 * 
 *  注:结合具体需求;
 *  
 *  统计:get/post/head 请求;
 */

public class DataHandle2 {
    
    //map
    public static class Map extends Mapper<Object,Text,Text,IntWritable>{
        private static final IntWritable one = new IntWritable(1);
        private static Text line=new Text();
        public void map(Object key,Text value,Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            line=value;
            //测试
            //System.out.println("内容:"+line);
            String str=line.toString();
            
            if(!(str.indexOf("%")>0)) {
                //System.out.println("内容:"+line);
                String[] strs=str.split(""");
                //System.out.println("内容:"+strs[1]);
                String need=strs[1];
                
                if(need.startsWith("G")) {
                    //System.out.println("G");
                    context.write(new Text("Get"), one);
                }else if(need.startsWith("H")) {
                    //System.out.println("H");
                    context.write(new Text("Head"), one);
                }else if(need.startsWith("P")){
                    //System.out.println("P");
                    context.write(new Text("Post"), one);
                }else {
                    
                }
            }
            
        }
    }
    
    //reduce
    public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
        
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException {
            
            //测试
            //System.out.println("内容:"+key);
            
            int sum=0;
            
            //迭代累计频率;
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }

            this.result.set(sum);
            context.write(key, this.result);
        }
    }
    
    
    //main
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
        //配置类
        Configuration conf=new Configuration();
        conf.set("mapred.job.tracker", "localhost:9000");
        
        //获取传参
        //方式一:
        String[] ioArgs=new String[] {"input/daya2.txt","out3"};
        String[] otherArgs=new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
        if(otherArgs.length!=2) {
            System.err.println("Usage:Data Clean <in> <out> - path?");
            System.exit(2);
        }
        
        //判断输出文件是否存在;存在-删除;
        String url="hdfs://localhost:9000/user/kouch/"+ioArgs[1];
        FileSystem fs=FileSystem.get(URI.create(url), conf);
        if(fs.delete(new Path(url), true)) {//true:文件夹下所有文件;false:如果此文件存在其他文件就不删除
            System.out.println("删除"+url);
        }
        
        //Job设置
        Job job=Job.getInstance();
        job.setJarByClass(Deduplication.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //设置输入输出目录
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        
        //等待job完成之后再返回结果并退出程序
        System.exit(job.waitForCompletion(true)?0:1);
        
    }
    
    
}
...................................................
原文地址:https://www.cnblogs.com/floakss/p/11455797.html