MapReduce测试进度

经过再一次较为系统的学习,终于通过自己的编程实现了

统计最受欢迎的视频/文章的Top10访问次数 (video/article)

实现过程为两次使用MapReduce:

第一次对数据进行处理,留取视频、文章的ID为Key值,总的访问次数为Value值;

第二次对数据进行排序处理,统计最受欢迎的视频或文章;

第一次MapReduce

package test;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;

public class test01 {
    public static class MyMap extends Mapper<Object, Text, Text, IntWritable> {
        private static Text id = new Text();
        public static final IntWritable one = new IntWritable(1);

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String arr[] = line.split(",");
            id.set(arr[5]+" ");
            context.write(id, one);
        }

    }

    public static class MyReduce extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            context.write(key, result);
        }
        
    }
public static void main(String[] args) throws IOException ,ClassNotFoundException,InterruptedException{
    Job job=Job.getInstance();
    job.setJobName("test01");
    job.setJarByClass(test01.class);
    job.setMapperClass(MyMap.class);
    job.setReducerClass(MyReduce.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    Path in=new Path("hdfs://localhost:9000/usr/hadoop/result.txt");
    Path out=new Path("hdfs://localhost:9000/usr/hadoop/result");
    FileInputFormat.addInputPath(job, in);
    FileOutputFormat.setOutputPath(job, out);
    System.exit(job.waitForCompletion(true)?0:1);
    
}
}

第二次MapReduce:

package test;

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.Mapper;

public class test02 {
    public static class Map extends Mapper<Object, Text, IntWritable, Text> {
        private static Text id = new Text();
        private static IntWritable num = new IntWritable();

        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            System.out.println(line);
            String arr[] = line.split("	");
            /*
             * System.out.print(arr[0]); System.out.println(arr[1]);
             */
            num.set(Integer.parseInt(arr[1]));
            id.set(arr[0]);
            context.write(num, id);
        }
    }

    public static class red extends
            Reducer<IntWritable, Text, IntWritable, Text> {
        public void reduce(IntWritable key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                context.write(key, val);
            }
        }
    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "test02");
        job.setJarByClass(test02.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(red.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        Path in = new Path(
                "hdfs://localhost:9000/usr/hadoop/result/part-r-00000");
        Path out = new Path("hdfs://localhost:9000/usr/hadoop/result/out");
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}
原文地址:https://www.cnblogs.com/KYin/p/11939406.html