MappReduce例子和总结01

1.读取数据文件,赋给对象并输出

1.数据文件:

name,age,score,sex
tom,10,100,女
susa,9,99,男
hua,60,10,dog

2.创建对象并实现接口WritableComparable或Writable

public class Person02 implements WritableComparable<Person02> {
    private String name;
    private int age;
    private int score;
    private String sex;
@Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.name);
        dataOutput.writeInt(this.age);
        dataOutput.writeInt(this.score);
        dataOutput.writeUTF(this.sex);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.name = dataInput.readUTF();
        this.age=dataInput.readInt();
        this.score=dataInput.readInt();
        this.sex=dataInput.readUTF();
    }

3.编写MappReduce 方法,如果不需要Reducer方法处理,仅mapp方法也可

public class OnlyMapp {
    public static class OnMapp extends Mapper<LongWritable, Text,Person02, NullWritable>{
        Person02 person;
        Text text;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            person=new Person02();
            text=new Text();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (key.get()!=0){
                //设置汉字编码格式
                String line=new String(value.getBytes(),0,value.getLength(),"GBK");
                String[] split = line.split(",");
                person.setName(split[0]);
                person.setAge(Integer.parseInt(split[1]));
                person.setScore(Integer.parseInt(split[2]));
                person.setSex(split[3]);
                context.write(person,NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(OnlyMapp.class);
        job.setMapperClass(OnMapp.class);
        job.setOutputKeyClass(Person.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job,"F:\input\person.txt");
        Path path=new Path("F:\out3");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);
        job.submit();
    }
}

4.输出结果文件

2.读取Json格式文件,并赋给对象并输出

数据文件:

1.创建对象并实现接口的方法

public class Phone implements Writable {
    private int uid;
    private String phone;
    private String addr;

2.编写MappReduce方法

public class JsonToObject {
    public static class JSMapp extends Mapper<LongWritable, Text,Phone, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            ObjectMapper objectMapper = new ObjectMapper();
            //设置汉字字符编码
            String line=new String(value.getBytes(),0,value.getLength(),"GBK");
            Phone phone = objectMapper.readValue(line, Phone.class);
            context.write(phone,NullWritable.get());
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(JsonToObject.class);
        job.setMapperClass(JSMapp.class);
        job.setOutputKeyClass(Phone.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job,"F:\input\phone.json");
        Path path=new Path("F:\out4");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)){
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);
        job.submit();
    }

 3.输出结果文件

原文地址:https://www.cnblogs.com/TFE-HardView/p/11426034.html