【转】mapreduce中的全局文件使用方法:以k-means为例

【转自:http://www.linuxidc.com/Linux/2012-10/71540p3.htm】  

以前有做过在Hadoop编 写程序时使用全局变量的想法,但是最后却没有实现,上网查才看到说Hadoop不支持全局变量。但是有时候编程的时候又会用到,比如编写k-means算 法的时候,如果可以有个全局变量存储中心点该多好呀。其实在hadoop中确实是有相关的实现的,比如可以在mapper中的setup函数中读取一个小 文件,然后从这个文件中取出全局变量的值。

那具体如何实现呢?首先提出一个问题,然后利用这种思想去解决会比较好。首先说下我要实现的问题:我现在有输入数据如下:

  1. 0.0 0.2 0.4 
  2. 0.3 0.2 0.4 
  3. 0.4 0.2 0.4 
  4. 0.5 0.2 0.4 
  5. 5.0 5.2 5.4 
  6. 6.0 5.2 6.4 
  7. 4.0 5.2 4.4 
  8. 10.3    10.4    10.5 
  9. 10.3    10.4    10.5 
  10. 10.3    10.4    10.5 

而且还有一个小数据文件(中心点)如下:

  1. 0   0   0 
  2. 5   5   5 
  3. 10  10  10 

我想做的事情就是把输入数据按照中心点求平均值,即首先我把输入数据分类,比如倒数三行应该都是属于(10,10,10)这个中心点的,那么我的 map就把倒数三行的key都赋值为2,然后value值还是保持这三行不变。在reduce阶段,我求出相同key的sum值,同时求出一共的行数 count,最后我用sum/count得到我想要的按照中心点求出的平均值了。

下面贴代码:

KmeansDriver:

  1. package org.fansy.date927; 
  2.  
  3. import java.io.IOException; 
  4.  
  5. //import org.apache.commons.logging.Log;  
  6. //import org.apache.commons.logging.LogFactory;  
  7. import org.apache.hadoop.conf.Configuration; 
  8. import org.apache.hadoop.filecache.DistributedCache; 
  9. //import org.apache.hadoop.fs.FileSystem;  
  10. import org.apache.hadoop.fs.Path; 
  11. import org.apache.hadoop.io.IntWritable; 
  12. import org.apache.hadoop.io.NullWritable; 
  13. import org.apache.hadoop.io.Text; 
  14. import org.apache.hadoop.mapreduce.Job; 
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
  17. import org.apache.hadoop.util.GenericOptionsParser; 
  18.  
  19. public class KmeansDriver { 
  20.     /** 
  21.      *   k-means algorithm program   
  22.      */ 
  23.     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
  24.         // TODO Auto-generated method stub  
  25.         Configuration conf=new Configuration(); 
  26.         // set the centers data file  
  27.         <SPAN style="COLOR: #ff0000">Path centersFile=new Path("hdfs://fansyPC:9000/user/fansy/input/centers"); 
  28.         DistributedCache.addCacheFile(centersFile.toUri(), conf);</SPAN> 
  29.         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
  30.         if (otherArgs.length != 2) { 
  31.           System.err.println("Usage: KmeansDriver <in> <out>"); 
  32.           System.exit(2); 
  33.         } 
  34.         Job job = new Job(conf, "kmeans job"); 
  35.         job.setJarByClass(KmeansDriver.class); 
  36.         job.setMapperClass(KmeansM.class); 
  37.         job.setMapOutputKeyClass(IntWritable.class); 
  38.         job.setMapOutputValueClass(DataPro.class); 
  39.         job.setNumReduceTasks(2); 
  40.                 //    job.setCombinerClass(KmeansC.class);  
  41.         job.setReducerClass(KmeansR.class); 
  42.         job.setOutputKeyClass(NullWritable.class); 
  43.         job.setOutputValueClass(Text.class); 
  44.          
  45.         FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
  46.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
  47.          
  48.         if(!job.waitForCompletion(true)){ 
  49.             System.exit(1); // run error then exit  
  50.         }     
  51.     } 
  52.  
  53. }
  54. Mapper:
    1. package org.fansy.date927; 
    2.  
    3. import java.io.BufferedReader; 
    4. import java.io.FileReader; 
    5. import java.io.IOException; 
    6. import java.util.ArrayList; 
    7. import java.util.List; 
    8.  
    9. import org.apache.commons.logging.Log; 
    10. import org.apache.commons.logging.LogFactory; 
    11. import org.apache.Hadoop.filecache.DistributedCache; 
    12. import org.apache.hadoop.fs.Path; 
    13. import org.apache.hadoop.io.IntWritable; 
    14. import org.apache.hadoop.io.LongWritable; 
    15. import org.apache.hadoop.io.Text; 
    16. import org.apache.hadoop.mapreduce.Mapper; 
    17.  
    18. public class KmeansM extends Mapper<LongWritable,Text,IntWritable,DataPro>{ 
    19.     private static Log log=LogFactory.getLog(KmeansM.class); 
    20.      
    21.     private double[][] centers; 
    22.     private int dimention_m;  //  this is the k   
    23.     private int dimention_n;   //  this is the features   
    24.  
    25.      
    26.     static enum Counter{Fansy_Miss_Records}; 
    27.     @Override 
    28.     public void setup(Context context) throws IOException,InterruptedException{ 
    29.         <SPAN style="COLOR: #ff0000">Path[] caches=DistributedCache.getLocalCacheFiles(context.getConfiguration());</SPAN> 
    30.         if(caches==null||caches.length<=0){ 
    31.             log.error("center file does not exist"); 
    32.             System.exit(1); 
    33.         } 
    34.         @SuppressWarnings("resource"
    35.         BufferedReader br=new BufferedReader(new FileReader(caches[0].toString())); 
    36.         String line; 
    37.         List<ArrayList<Double>> temp_centers=new ArrayList<ArrayList<Double>>(); 
    38.         ArrayList<Double> center=null
    39.         //  get the file data  
    40.         <SPAN style="COLOR: #ff0000">while((line=br.readLine())!=null){ 
    41.             center=new ArrayList<Double>(); 
    42.             String[] str=line.split(" "); 
    43.             for(int i=0;i<str.length;i++){ 
    44.                 center.add(Double.parseDouble(str[i])); 
    45.             } 
    46.             temp_centers.add(center); 
    47.         }</SPAN> 
    48.         //  fill the centers   
    49.         @SuppressWarnings("unchecked"
    50.         ArrayList<Double>[] newcenters=temp_centers.toArray(new ArrayList[]{}); 
    51.          dimention_m=temp_centers.size(); 
    52.          dimention_n=newcenters[0].size(); 
    53.         centers=new double[dimention_m][dimention_n]; 
    54.         for(int i=0;i<dimention_m;i++){ 
    55.             Double[] temp_double=newcenters[i].toArray(new Double[]{}); 
    56.             for(int j=0;j<dimention_n;j++){ 
    57.                 centers[i][j]=temp_double[j]; 
    58.         //      System.out.print(temp_double[j]+",");  
    59.             } 
    60.     //      System.out.println();  
    61.         } 
    62.     } 
    63.              
    64.     public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ 
    65.         String[] values=value.toString().split(" "); 
    66.          
    67.         if(values.length!=dimention_n){ 
    68.             context.getCounter(Counter.Fansy_Miss_Records).increment(1); 
    69.             return
    70.         } 
    71.         double[] temp_double=new double[values.length]; 
    72.         for(int i=0;i<values.length;i++){ 
    73.             temp_double[i]=Double.parseDouble(values[i]); 
    74.         } 
    75.         //  set the index  
    76.         double distance=Double.MAX_VALUE; 
    77.         double temp_distance=0.0
    78.         int index=0
    79.         for(int i=0;i<dimention_m;i++){ 
    80.             double[] temp_center=centers[i]; 
    81.             temp_distance=getEnumDistance(temp_double,temp_center); 
    82.             if(temp_distance<distance){ 
    83.                  index=i; 
    84.                 distance=temp_distance; 
    85.             } 
    86.         } 
    87.         DataPro newvalue=new DataPro(); 
    88.         newvalue.set(value, new IntWritable(1)); 
    89.         context.write(new IntWritable(index), newvalue); 
    90.          
    91.     } 
    92.     public static double getEnumDistance(double[] source,double[] other){  //  get the distance  
    93.         double distance=0.0
    94.         if(source.length!=other.length){ 
    95.             return Double.MAX_VALUE; 
    96.         } 
    97.         for(int i=0;i<source.length;i++){ 
    98.             distance+=(source[i]-other[i])*(source[i]-other[i]); 
    99.         } 
    100.         distance=Math.sqrt(distance); 
    101.         return distance; 
    102.     } 

    Reducer:

    1. package org.fansy.date927; 
    2.  
    3. import java.io.IOException; 
    4. import org.apache.Hadoop.io.IntWritable; 
    5. import org.apache.hadoop.io.NullWritable; 
    6. import org.apache.hadoop.io.Text; 
    7. import org.apache.hadoop.mapreduce.Reducer; 
    8.  
    9. public class KmeansR extends Reducer<IntWritable,DataPro,NullWritable,Text> { 
    10.      
    11.     public void reduce(IntWritable key,Iterable<DataPro> values,Context context)throws InterruptedException, IOException{ 
    12.         // get dimension first  
    13.         int dimension=0
    14.         for(DataPro val:values){ 
    15.             String[] datastr=val.getCenter().toString().split(" "); 
    16.             dimension=datastr.length; 
    17.             break
    18.         } 
    19.         double[] sum=new double[dimension]; 
    20.         int sumCount=0
    21.         for(DataPro val:values){ 
    22.             String[] datastr=val.getCenter().toString().split(" "); 
    23.             sumCount+=val.getCount().get(); 
    24.             for(int i=0;i<dimension;i++){ 
    25.                 sum[i]+=Double.parseDouble(datastr[i]); 
    26.             } 
    27.         } 
    28.         //  calculate the new centers  
    29. //      double[] newcenter=new double[dimension];  
    30.         StringBuffer sb=new StringBuffer(); 
    31.         for(int i=0;i<dimension;i++){ 
    32.             sb.append(sum[i]/sumCount+" "); 
    33.         } 
    34.         context.write(nullnew Text(sb.toString())); 
    35.     } 

    DataPro:

    1. package org.fansy.date927; 
    2.  
    3. import java.io.DataInput; 
    4. import java.io.DataOutput; 
    5. import java.io.IOException; 
    6.  
    7. import org.apache.hadoop.io.IntWritable; 
    8. import org.apache.hadoop.io.Text; 
    9. import org.apache.hadoop.io.WritableComparable; 
    10.  
    11. public class DataPro implements WritableComparable<DataPro>{ 
    12.  
    13.     private Text center; 
    14.     private IntWritable count; 
    15.      
    16.     public DataPro(){ 
    17.         set(new Text(),new IntWritable()); 
    18.     } 
    19.     public void set(Text text, IntWritable intWritable) { 
    20.         // TODO Auto-generated method stub  
    21.         this.center=text; 
    22.         this.count=intWritable; 
    23.     } 
    24.      
    25.     public Text getCenter(){ 
    26.         return center; 
    27.     } 
    28.     public IntWritable getCount(){ 
    29.         return count; 
    30.     } 
    31.      
    32.      
    33.     @Override 
    34.     public void readFields(DataInput arg0) throws IOException { 
    35.         // TODO Auto-generated method stub  
    36.         center.readFields(arg0); 
    37.         count.readFields(arg0); 
    38.     } 
    39.  
    40.     @Override 
    41.     public void write(DataOutput arg0) throws IOException { 
    42.         // TODO Auto-generated method stub  
    43.         center.write(arg0); 
    44.         count.write(arg0); 
    45.     } 
    46.  
    47.     @Override 
    48.     public int compareTo(DataPro o) { 
    49.         // TODO Auto-generated method stub  
    50.         int cmp=count.compareTo(o.count); 
    51.         if(cmp!=0){ 
    52.             return cmp; 
    53.         } 
    54.         return center.compareTo(o.center); 
    55.     } 
    56.  

    这里自定义了一个DataPro数据类型,主要是为了为以后编写真正的k-means算法时使用combiner做准备,具体思想可以参考上篇combine操作。

    输出文件如下:

    1. 0.39999999999999997 0.20000000000000004 0.4000000000000001   
    2. 5.0 5.2 5.4   
    3. 10.3    10.4    10.5   

    这篇文章参考了 http://www.linuxidc.com/Linux/2012-10/71538.htm 部分实现,在那篇文章中的k-means思想的主要思想是:使用map读入centers文件值,然后把数据文件data作为一个全局量,然后reduce在进行求中心点的操作。(或许我理解错了也说不定)

    做完这一步后,如果要编写K-means算法就可以说是已经实现了大半了,剩下的就是设置下输入和输出路径,然后进行迭代了。

    【转自:http://www.linuxidc.com/Linux/2012-10/71540p3.htm】
原文地址:https://www.cnblogs.com/aitixiaocai/p/3561935.html