seq2sparse(4)之PartialVectorMergeReducer源码分析

继前篇blogseq2sparse(3)之TFParitialVectorReducer源码分析 之后,继续分析下面的代码,本次分析的是PartialVectorMergeReducer的源码,这个reducer是下面的函数调用的:

PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, conf, normPower, logNormalize,
      maxTermDimension[0], sequentialAccess, namedVectors, numReducers);

这个调用是在前面blog分析的makePartialVectors函数之后,先make,然后在merge。这个函数的同样启动了一个Job,不过这个Job和前面的一样,没有Mapper,只有Reducer,下面来分析这个Reducer。这个reducer同样只包含setup和reduce而已,在setup中只是设置了一些基本的参数,这些参数在reduce中会用到;比如normPower,这个参数是作为一个if的条件判断,这里先明确下它的值,方便后面reduce分析。在参数解释中:

  1. --norm (-n) norm                    The norm to use, expressed as either a      
  2.                                       float or "INF" if you want to use the       
  3.                                       Infinite norm.  Must be greater or equal    
  4.                                       to 0.  The default is not to normalize

可以看到它的默认值是not to normalize,其实就是-1;

下面分析reduce:(源码如下:)

Vector vector = new RandomAccessSparseVector(dimension, 10);
    for (VectorWritable value : values) {
      vector.assign(value.get(), Functions.PLUS);
    }
    if (normPower != PartialVectorMerger.NO_NORMALIZING) {
      if (logNormalize) {
        vector = vector.logNormalize(normPower);
      } else {
        vector = vector.normalize(normPower);
      }
    }
    if (sequentialAccess) {
      vector = new SequentialAccessSparseVector(vector);
    }
    
    if (namedVector) {
      vector = new NamedVector(vector, key.toString());
    }
    
    VectorWritable vectorWritable = new VectorWritable(vector);
    context.write(key, vectorWritable);

首先,reduce接受的map输出的key是文件名,value是tokenDocument后的文件所有的单词的集合,这里就可以看出key是没有重复的,所以第一个for循环其实就只执行了一次。但是假如这里又同名的文件,那么这里执行的是什么操作呢?看代码很容易就猜到应该是把同名的文件中的单词对应的次数对应相加,然后作为一个文件,也就是所谓的merge,整合。然后到if判断,前面分析可以知道这个if是不进入的,所以不加以分析。如果硬要分析的话,单看函数名大概可以猜到应该是把出现的次数进行归一化什么之类的,比如本来的单词个数分别是[4,5,2,7],那么经过了if里面的这个次数可能变为[log(4),log(5),log(2),log(7)]之类的东西,这里应该是要防止单词的次数太大,不方便后面的计算吧。最后就是重新改下value的格式,然后就输出了。这里可以看到这个操作的输出其实和前一个make的输出是一模一样的。可以编写下面的代码进行验证:

package mahout.fansy.test.bayes;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.mahout.math.VectorWritable;

public class PartialVectorMergeReducerFollow {
	private static Configuration conf = new Configuration();
	private static String mapOutPath;
	static{
		conf.set("mapred.job.tracker", "ubuntu:9001");
	//	mapOutPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout0/20news-vectors/partial-vectors-0/part-r-00000";
		mapOutPath="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout/20news-vectors/tf-vectors/part-r-00000";
	}
	public static void main(String[] args) throws IOException {
		getKeyAndValues();
	}
	
	/**
	 * 获得PartialVectorMerger的map输出;
	 * @return
	 * @throws IOException 
	 */
	public static Map<String,List<VectorWritable>> getKeyAndValues() throws IOException{
		Map<String,List<VectorWritable>> map=new HashMap<String,List<VectorWritable>>();
		
	    FileSystem fs = FileSystem.get(URI.create(mapOutPath), conf);
	    Path path = new Path(mapOutPath);

	    SequenceFile.Reader reader = null;
	    try {
	      reader = new SequenceFile.Reader(fs, path, conf);
	      Writable key = (Writable)
	        ReflectionUtils.newInstance(reader.getKeyClass(), conf);
	      Writable value = (Writable)
	        ReflectionUtils.newInstance(reader.getValueClass(), conf);
	      while (reader.next(key, value)) {
	        String k=key.toString();
	        VectorWritable v=(VectorWritable)value;
	        v=new VectorWritable(v.get());  // 第一种方式
	        if(map.containsKey(k)){ //如果包含则把其value值取出来加上一个新的vectorWritable到list中
	        	List<VectorWritable> list=map.get(k);
	        	list.add(v);
	        	map.put(k, list);
	        }else{                 // 否则直接new一个新的list,添加该vectorWritable到list中
	        	List<VectorWritable> list=new ArrayList<VectorWritable>();
	        	list.clear();
	        	list.add(v);
	     //   	List<VectorWritable> listCopy=new ArrayList<VectorWritable>();
	     //   	listCopy.addAll(list);  // 第二种方式
	        	map.put(k, list);
	        	
	        }
	      }
	    } finally {
	      IOUtils.closeStream(reader);
	    }
		return map;
	}

}

这里有点小纠结的地方,就是value和v的地址是一样的,如果使用第二种方式是不行的,第二种方式没有实现list的深复制,所以v的地址和value的地址是一样的,这样导致map的输出的key是不一样的,但是所有key的value都是一样的;有下面的图像可以大概看出一二:


这里说看出一二是指,前面两次的value值中的单词顺序不是按照一样的规则排序的,如果你把所有的value值都拷贝下来,前后两次对比,就会发现,是一样的。


分享,快乐,成长


转载请注明出处:http://blog.csdn.net/fansy1990 



原文地址:https://www.cnblogs.com/suncoolcat/p/3299339.html