MapReduce -- TF-IDF

通过MapReduce实现 TF-IDF值的统计

数据:文章ID  文件内容

3823890378201539    今天约了姐妹去逛街吃美食,周末玩得很开心啊!
......
......

结果数据:

3823890378201539    开心:0.28558719539400335    吃:0.21277211221173534    了:0.1159152517783012    美食:0.29174432675350614    去:0.18044286652763497    玩:0.27205714412756765    啊:0.26272169358877784    姐妹:0.3983823545319593    逛街:0.33320559604063593    得很:0.45170136842118586    周末:0.2672478858982343    今天:0.16923426566752778    约:0.0946874743049455
......
......

在整个的处理过程中通过两步来完成

第一步主要生成三种格式的文件

1、使用分词工具将文章内容进行拆分成多个词条;并记录文章的总词条数 关于分词工具的使用请参考  TF-IDF
第一步处理后结果:

今天_3823890378201539    A:1,B:13,
周末_3823890378201539    A:1,B:13,
得很_3823890378201539    A:1,B:13,
约_3823890378201539    B:13,A:1,
......

2、记录词条在多少篇文章中出现过

处理后结果:

今天    118
周末    33311
......

3、记录文章总数

处理后结果:

counter    1065

第二步将文件2,3的内容加载到缓存,利用2,3文件的内容对文件1的内容通过mapreduce进行计算

针对数据量不是很大的数据可以加载到缓存,如果数据量过大,不考虑这种方式;

源码

Step1.java:

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.Text;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Mapper;
  8 import org.apache.hadoop.mapreduce.Reducer;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 import org.wltea.analyzer.core.IKSegmenter;
 12 import org.wltea.analyzer.core.Lexeme;
 13 
 14 import java.io.IOException;
 15 import java.io.StringReader;
 16 import java.util.HashMap;
 17 import java.util.Map;
 18 import java.util.Map.Entry;
 19 
 20 /**
 21  * Created by Edward on 2016/7/21.
 22  */
 23 public class Step1 {
 24 
 25     public static void main(String[] args)
 26     {
 27         //access hdfs's user
 28         //System.setProperty("HADOOP_USER_NAME","root");
 29 
 30         Configuration conf = new Configuration();
 31         conf.set("fs.defaultFS", "hdfs://node1:8020");
 32 
 33         try {
 34             FileSystem fs = FileSystem.get(conf);
 35 
 36             Job job = Job.getInstance(conf);
 37             job.setJarByClass(RunJob.class);
 38             job.setMapperClass(MyMapper.class);
 39             job.setReducerClass(MyReducer.class);
 40             job.setPartitionerClass(FilterPartition.class);
 41 
 42             //需要指定 map out 的 key 和 value
 43             job.setOutputKeyClass(Text.class);
 44             job.setOutputValueClass(Text.class);
 45 
 46             //设置reduce task的数量
 47             job.setNumReduceTasks(4);
 48 
 49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/input"));
 50 
 51             Path path = new Path("/test/tfidf/output");
 52             if(fs.exists(path))//如果目录存在,则删除目录
 53             {
 54                 fs.delete(path,true);
 55             }
 56             FileOutputFormat.setOutputPath(job, path);
 57 
 58             boolean b = job.waitForCompletion(true);
 59             if(b)
 60             {
 61                 System.out.println("OK");
 62             }
 63 
 64         } catch (Exception e) {
 65             e.printStackTrace();
 66         }
 67     }
 68 
 69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
 70         @Override
 71         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 72             Map<String, Integer> map = new HashMap<String, Integer>();
 73 
 74             String[] str = value.toString().split("	");
 75             StringReader stringReader = new StringReader(str[1]);
 76             IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
 77             Lexeme lexeme = null;
 78             Long count = 0l;
 79             while((lexeme = ikSegmenter.next())!=null) {
 80                 String word = lexeme.getLexemeText();
 81                 if(map.containsKey(word)) {
 82                     map.put(word, map.get(word)+1);
 83                 }
 84                 else{
 85                     map.put(word, 1);
 86                 }
 87                 count++;
 88             }
 89             for(Entry<String, Integer> entry: map.entrySet())
 90             {
 91                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("A:"+entry.getValue()));//tf词条在此文章中的个数
 92                 context.write(new Text(entry.getKey()+"_"+str[0]), new Text("B:"+count));//此文章中的总词条数
 93                 context.write(new Text(entry.getKey()),new Text("1"));//词条在此文章中出现+1,计算词条在那些文章中出现过
 94             }
 95             context.write(new Text("counter"), new Text(1+""));//文章数累加器
 96         }
 97     }
 98 
 99     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
100         @Override
101         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
102 
103             //计算总文章数
104             if(key.toString().equals("conter")) {
105                 long sum = 0l;
106                 for(Text v :values)
107                 {
108                     sum += Long.parseLong(v.toString());
109                 }
110                 context.write(key, new Text(sum+""));
111             }
112             else{
113                 if(key.toString().contains("_")) {
114                     StringBuilder stringBuilder = new StringBuilder();
115                     for (Text v : values) {
116                         stringBuilder.append(v.toString());
117                         stringBuilder.append(",");
118                     }
119                     context.write(key, new Text(stringBuilder.toString()));
120                 }
121                 else {//计算词条在那些文章中出现过
122                     long sum = 0l;
123                     for(Text v :values)
124                     {
125                         sum += Long.parseLong(v.toString());
126                     }
127                     context.write(key, new Text(sum+""));
128                 }
129             }
130         }
131     }
132 }

FilterPartition.java

 1 import org.apache.hadoop.io.Text;
 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 3 
 4 /**
 5  * Created by Edward on 2016/7/22.
 6  */
 7 public class FilterPartition extends HashPartitioner<Text, Text> {
 8 
 9     @Override
10     public int getPartition(Text key, Text value, int numReduceTasks) {
11 
12         if(key.toString().contains("counter"))
13         {
14             return numReduceTasks-1;
15         }
16 
17         if(key.toString().contains("_"))
18         {
19             return super.getPartition(key, value, numReduceTasks-2);
20         }
21         else
22         {
23             return numReduceTasks-2;
24         }
25     }
26 }

Step2.java

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.Text;
  6 import org.apache.hadoop.mapreduce.Job;
  7 import org.apache.hadoop.mapreduce.Mapper;
  8 import org.apache.hadoop.mapreduce.Reducer;
  9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 
 12 import java.io.BufferedReader;
 13 import java.io.FileReader;
 14 import java.io.IOException;
 15 import java.net.URI;
 16 import java.util.HashMap;
 17 import java.util.Map;
 18 
 19 /**
 20  * Created by Edward on 2016/7/22.
 21  */
 22 public class Step2 {
 23     public static void main(String[] args)
 24     {
 25         //access hdfs's user
 26         //System.setProperty("HADOOP_USER_NAME","root");
 27 
 28         Configuration conf = new Configuration();
 29         conf.set("fs.defaultFS", "hdfs://node1:8020");
 30 
 31         try {
 32             FileSystem fs = FileSystem.get(conf);
 33 
 34             Job job = Job.getInstance(conf);
 35             job.setJarByClass(RunJob.class);
 36             job.setMapperClass(MyMapper.class);
 37             job.setReducerClass(MyReducer.class);
 38 
 39             //需要指定 map out 的 key 和 value
 40             job.setOutputKeyClass(Text.class);
 41             job.setOutputValueClass(Text.class);
 42 
 43             //分布式缓存,每个slave都能读到数据
 44                 //词条在多少文章中出现过
 45             job.addCacheFile(new Path("/test/tfidf/output/part-r-00002").toUri());
 46                 //文章的总数
 47             job.addCacheFile(new Path("/test/tfidf/output/part-r-00003").toUri());
 48 
 49             FileInputFormat.addInputPath(job, new Path("/test/tfidf/output"));
 50 
 51             Path path = new Path("/test/tfidf/output1");
 52             if(fs.exists(path))//如果目录存在,则删除目录
 53             {
 54                 fs.delete(path,true);
 55             }
 56             FileOutputFormat.setOutputPath(job, path);
 57 
 58             boolean b = job.waitForCompletion(true);
 59             if(b)
 60             {
 61                 System.out.println("OK");
 62             }
 63         } catch (Exception e) {
 64             e.printStackTrace();
 65         }
 66     }
 67 
 68 
 69     public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
 70 
 71         public static Map<String, Double> dfmap = new HashMap<String, Double>();
 72 
 73         public static Map<String, Double> totalmap = new HashMap<String, Double>();
 74 
 75         @Override
 76         protected void setup(Context context) throws IOException, InterruptedException {
 77             URI[] cacheFiles = context.getCacheFiles();
 78             Path pArtNum = new Path(cacheFiles[0].getPath());
 79             Path pArtTotal = new Path(cacheFiles[1].getPath());
 80 
 81             //加载词条在多少篇文章中出现过
 82             BufferedReader buffer = new BufferedReader(new FileReader(pArtNum.getName()));
 83             String line = null;
 84             while((line = buffer.readLine()) != null){
 85                 String[] str = line.split("	");
 86                 dfmap.put(str[0], Double.parseDouble(str[1]));
 87             }
 88 
 89             //加载文章总数
 90             buffer = new BufferedReader(new FileReader(pArtTotal.getName()));
 91             line = null;
 92             while((line = buffer.readLine()) != null){
 93                 String[] str = line.split("	");
 94                 totalmap.put(str[0], Double.parseDouble(str[1]));
 95             }
 96         }
 97 
 98         @Override
 99         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
100 
101             String[] strings = value.toString().split("	");
102             String k = strings[0];
103 
104             if(k.contains("counter")) {
105                 //过滤掉 文章总数
106             }
107             else if(k.contains("_")){
108                 String word = k.split("_")[0];
109                 String[] info = strings[1].split(",");
110                 String n=null;
111                 String num=null;
112                 if(info[0].contains("A")){
113                     n = info[0].substring(info[0].indexOf(":")+1);
114                     num = info[1].substring(info[0].indexOf(":")+1);
115                 }
116                 if(info[0].contains("B")){
117                     num = info[0].substring(info[0].indexOf(":")+1);
118                     n = info[1].substring(info[0].indexOf(":")+1);
119                 }
120                 double result = 0l;
121 
122                 result = (Double.parseDouble(n)/Double.parseDouble(num)) * Math.log( totalmap.get("counter")/dfmap.get(word));
123                 System.out.println("n=" + Double.parseDouble(n));
124                 System.out.println("num=" + Double.parseDouble(num));
125                 System.out.println("counter=" + totalmap.get("counter"));
126                 System.out.println("wordnum=" + dfmap.get(word));
127                 context.write(new Text(k.split("_")[1]), new Text(word+":"+result));
128             }
129             else{
130                 //过滤掉 词条在多少篇文章中出现过
131             }
132         }
133     }
134 
135     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
136         @Override
137         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
138 
139             StringBuilder stringBuilder = new StringBuilder();
140             for(Text t: values){
141                 stringBuilder.append(t.toString());
142                 stringBuilder.append("	");
143             }
144             context.write(key, new Text(stringBuilder.toString()) );
145         }
146     }
147 }
原文地址:https://www.cnblogs.com/one--way/p/5695875.html