MapReduce常见算法

MapReduce常见算法

  • 单词计数
  • 数据去重
  • 排序
  • Top K
  • 选择  以求最值为例,从100万数据中选出一最小值
  • 投影  以求处理手机上网日志为例,从其11个字段选出了五个字段()来显示我们的手机上网流量
  • 分组  相当于分区,以求处理手机上网日志为例,喊手机号和非手机号分为两组
  • 多表连接
  • 单表关联

使用TopK算法找出文件中的最大数:

 1 package suanfa;
 2 
 3 import java.io.IOException;
 4 import java.net.URI;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.NullWritable;
11 import org.apache.hadoop.io.Text;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 
18 /**
19  * 找出文件中的最大数
20  * 
21  * @author ahu_lichang
22  * 
23  */
24 public class TopKApp {
25     static final String INPUT_PATH = "hdfs://chaoren:9000/input";
26     static final String OUT_PATH = "hdfs://chaoren:9000/out";
27 
28     public static void main(String[] args) throws Exception {
29         Configuration conf = new Configuration();
30         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
31         Path outPath = new Path(OUT_PATH);
32         if (fileSystem.exists(outPath)) {
33             fileSystem.delete(outPath, true);
34         }
35         Job job = new Job(conf, TopKApp.class.getSimpleName());
36         FileInputFormat.setInputPaths(job, INPUT_PATH);
37         job.setMapperClass(MyMapper.class);
38         job.setReducerClass(MyReducer.class);
39         job.setOutputKeyClass(LongWritable.class);
40         job.setOutputValueClass(NullWritable.class);
41         FileOutputFormat.setOutputPath(job, outPath);
42         job.waitForCompletion(true);
43     }
44 
45     static class MyMapper extends
46             Mapper<LongWritable, Text, LongWritable, NullWritable> {
47         long max = Long.MIN_VALUE;
48 
49         protected void map(LongWritable k1, Text v1, Context context)
50                 throws java.io.IOException, InterruptedException {
51             long temp = Long.parseLong(v1.toString());
52             if (temp > max) {
53                 max = temp;
54             }
55         }
56 
57         @Override
58         protected void cleanup(
59                 org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context)
60                 throws IOException, InterruptedException {
61             context.write(new LongWritable(max), NullWritable.get());
62         }
63     }
64 
65     static class MyReducer extends
66             Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
67         long max = Long.MIN_VALUE;
68 
69         protected void reduce(
70                 LongWritable k2,
71                 Iterable<NullWritable> v2s,
72                 org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
73                 throws IOException, InterruptedException {
74             long temp = k2.get();
75             if (temp > max) {
76                 max = temp;
77             }
78         }
79 
80         @Override
81         protected void cleanup(
82                 org.apache.hadoop.mapreduce.Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context)
83                 throws IOException, InterruptedException {
84             context.write(new LongWritable(max), NullWritable.get());
85         }
86     }
87 }

遇见一个问题:在删除HDFS中的文件的时候,说文件时安全模式下,无法删除?

这时候要想删除该文件,必须退出安全模式,Hadoop退出安全模式的命令是:hadoop dfsadmin -safemode leave

原文地址:https://www.cnblogs.com/ahu-lichang/p/6661596.html