MapReduce数据处理

数据格式如下:

清洗数据,提取下列信息做精细化操作,

ip--->城市 city(IP)

date--> time:2016-11-10 00:01:03(日期)

day: 10(天数)

traffic:62(流量)

type:article/video(类型:视频video或文章article)

id:11325(视频或者文章的id)

 1 public static class Map extends Mapper<Object, Text, IntWritable, Text> {
 2         private static Text goods = new Text();
 3         private static IntWritable num = new IntWritable();
 4 
 5         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 6             String line = value.toString();
 7             String arr[] = line.split("[ 	/:]");
 8             num.set(Integer.parseInt(arr[0]));
 9             goods.set(arr[1] + "	" + arr[4] + "-11-10" + " " + arr[5] + ":" + arr[6] + ":" + arr[7] + "	"  + arr[10] + "	" + arr[11]+ "	" + arr[12]);
10             context.write(num,goods);
11         }
12     }
13     
14     public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text> {
15         public void reduce(IntWritable key, Iterable<Text> values, Context context)
16                 throws IOException, InterruptedException {
17             for (Text val : values) {
18                 context.write(key,val);
19             }
20         }
21     }

统计视频/文章的访问次数:

public static class doMapper extends Mapper<Object, Text, Text, IntWritable> {
        public static final IntWritable one = new IntWritable(1);
        private static Text goods = new Text();
        private static IntWritable num = new IntWritable();
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String arr[] = line.split("	");
             goods.set(arr[0] +"	"+ arr[5]);
            context.write(goods,one);
        }
    }
    public static class doReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

统计最受欢迎的前十个:

public static List<String> Names = new ArrayList<String>();
    public static List<String> Values = new ArrayList<String>();
    public static List<String> Texts = new ArrayList<String>();
    public static class Sort extends WritableComparator {
        public Sort() {
            // 这里就是看你map中填的输出key是什么数据类型,就给什么类型
            super(IntWritable.class, true);
        }
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            return -a.compareTo(b);// 加个负号就是倒序,把负号去掉就是正序。
        }
    }
    public static class Map extends Mapper<Object, Text, IntWritable, Text> {
        private static Text Name = new Text();
        private static IntWritable num = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String arr[] = line.split("	");
            num.set(Integer.parseInt(arr[2]));
            Name.set(arr[0] + "	" + arr[1]);
            context.write(num, Name);
        }
    }
    public static class Reduce extends Reducer<IntWritable, Text, Text, IntWritable> {
        public void reduce(IntWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text val : values) {
                for(int i=0;i < 10;i++) {
                    String arr[] = val.toString().split("	");
                    Texts.add(arr[1]);
                    Names.add(arr[0]);
                    Values.add(key.toString());
                }
                context.write(val, key);
            }
        }
    }
原文地址:https://www.cnblogs.com/yuanxiaochou/p/11854085.html