mahout算法源码分析之Itembased Collaborative Filtering(三)RowSimilarityJob验证

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit。

 本篇分析上篇的分析是否正确,主要是编写上篇输出文件的读取以及添加log信息打印相关变量。

首先,编写下面的测试文件分析所有的输出:

package mahout.fansy.item;

import java.io.IOException;
import java.util.Map;

import mahout.fansy.utils.read.ReadArbiKV;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;

import junit.framework.TestCase;

public class ReadRowSimilarityJobOut extends TestCase {
	// 测试 weights 输出:
	public void testWeights() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/weights/part-r-00000";
		Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
		System.out.println("weights=================");
		System.out.println(map);
	}
	//normsPath
	public void testNormsPath() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/norms.bin";
		Vector map=getVector(path);
		System.out.println("normsPath=================");
		System.out.println(map);
	}
	//maxValues.bin
	public void testMaxValues() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/maxValues.bin";
		Vector map=getVector(path);
		System.out.println("maxValues=================");
		System.out.println(map);
	}
	//numNonZeroEntries.bin
	public void testNumNonZeroEntries() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/numNonZeroEntries.bin";
		Vector map=getVector(path);
		System.out.println("numNonZeroEntries=================");
		System.out.println(map);
	}
	
	//pairwiseSimilarityPath
	public void testPairwiseSimilarityPath() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/pairwiseSimilarity/part-r-00000";
		
		Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
		System.out.println("pairwiseSimilarityPath=================");
		System.out.println(map);
	}
	
	//similarityMatrix
	public void testSimilarityMatrix() throws IOException{
		String path="hdfs://ubuntu:9000/user/mahout/item/temp/similarityMatrix/part-r-00000";
		Map<Writable,Writable> map= ReadArbiKV.readFromFile(path);
		System.out.println("similarityMatrix=================");
		System.out.println(map);
	}
	
	// 读取.bin文件
	public Vector getVector(String path){
		Configuration conf=new Configuration();
		conf.set("mapred.job.tracker", "ubuntu:9001");
		Vector vector=null;
		try {
			vector = Vectors.read(new Path(path), conf);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return vector;
	}
}

运行上面的文件得到下面的输出:

weights=================
{1={103:2.5,102:3.0,101:5.0}, 2={101:2.0,104:2.0,103:5.0,102:2.5}, 3={101:2.5,107:5.0,105:4.5,104:4.0}, 4={101:5.0,106:4.0,104:4.5,103:3.0}, 5={106:4.0,105:3.5,104:4.0,103:2.0,102:3.0,101:4.0}}
normsPath=================
{107:25.0,106:32.0,105:32.5,104:56.25,103:44.25,102:24.25,101:76.25}
maxValues=================
{}
numNonZeroEntries=================
{}
pairwiseSimilarityPath=================
{102={106:0.14972506706560876,105:0.14328432723886902,104:0.12789210656028413,103:0.1975496259559987}, 103={106:0.1424339656566283,105:0.11208890297777215,104:0.14037600977966974}, 101={107:0.10275248635596666,106:0.1424339656566283,105:0.1158457425543559,104:0.16015261286229274,103:0.15548737703860027,102:0.14201473202245876}, 106={}, 107={}, 104={107:0.13472338607037426,106:0.18181818181818182,105:0.16736577623297264}, 105={107:0.2204812092115424,106:0.14201473202245876}}
similarityMatrix=================
{102={101:0.14201473202245876,106:0.14972506706560876,105:0.14328432723886902,104:0.12789210656028413,103:0.1975496259559987}, 103={101:0.15548737703860027,106:0.1424339656566283,105:0.11208890297777215,104:0.14037600977966974,102:0.1975496259559987}, 101={107:0.10275248635596666,106:0.1424339656566283,105:0.1158457425543559,104:0.16015261286229274,103:0.15548737703860027,102:0.14201473202245876}, 106={101:0.1424339656566283,105:0.14201473202245876,104:0.18181818181818182,103:0.1424339656566283,102:0.14972506706560876}, 107={105:0.2204812092115424,104:0.13472338607037426,101:0.10275248635596666}, 104={107:0.13472338607037426,106:0.18181818181818182,105:0.16736577623297264,103:0.14037600977966974,102:0.12789210656028413,101:0.16015261286229274}, 105={107:0.2204812092115424,106:0.14201473202245876,104:0.16736577623297264,103:0.11208890297777215,102:0.14328432723886902,101:0.1158457425543559}}

其中第一个weights就和分析的一模一样,这里就不再相信写了。那就只分析pairwiseSimilarityPath和similarityMatrix了:

(1)pairwiseSimilarityPath:

前面关于这个的分析在最后reducer的时候是有错误的,应该说是没有分析完,如下图(此截图是使用log打印的变量信息):


可以看到上篇其实只是分析到了第二行(第二行和第三行一样)而已,而没有分析到最后的输出。其实也只是少分析了一个while循环而已:

while (dotsWith.hasNext()) {
        Vector.Element b = dotsWith.next();
        double similarityValue = similarity.similarity(b.get(), normA, norms.getQuick(b.index()), numberOfColumns);
        if (similarityValue >= treshold) {
          similarities.set(b.index(), similarityValue);
        }
      }

这里来分析一下根据第二行的值如何求得第四行的值,首先normA是norms中的102对应的值,即24.25,然后来看similarity函数:

public double similarity(double dots, double normA, double normB, int numberOfColumns) {
    double euclideanDistance = Math.sqrt(normA - 2 * dots + normB);
    return 1.0 / (1.0 + euclideanDistance);
  }

项目106调用的参数应该是similarity(12.0,24.25,32.0,5),所以返回的值是1/(1+sqrt(24.25-2*12+32))=0.149725067,刚好和第四行的值对应;最后的输出没有102,是因为设置了similarities.setQuick(row.get(), 0);这样一句代码,把相对应的值设置为了0,也就是不输出。

(2)similarityMatrix

由(1)的分析可以知道,(2)的输入是这样的:

{102={106:0.14972506706560876,105:0.14328432723886902,104:0.12789210656028413,103:0.1975496259559987},
103={106:0.1424339656566283,105:0.11208890297777215,104:0.14037600977966974},
101={107:0.10275248635596666,106:0.1424339656566283,105:0.1158457425543559,104:0.16015261286229274,103:0.15548737703860027,102:0.14201473202245876},
106={}, 
107={}, 
104={107:0.13472338607037426,106:0.18181818181818182,105:0.16736577623297264}, 
105={107:0.2204812092115424,106:0.14201473202245876}}

关于这个job的mapper分析是正确的,但是combiner分析中的merge方法是不对的,可以看到merge的代码如下:

public static Vector merge(Iterable<VectorWritable> partialVectors) {
    Iterator<VectorWritable> vectors = partialVectors.iterator();
    Vector accumulator = vectors.next().get();
    while (vectors.hasNext()) {
      VectorWritable v = vectors.next();
      if (v != null) {
        Iterator<Vector.Element> nonZeroElements = v.get().iterateNonZero();
        while (nonZeroElements.hasNext()) {
          Vector.Element nonZeroElement = nonZeroElements.next();
          accumulator.setQuick(nonZeroElement.index(), nonZeroElement.get());
        }
      }
    }
    return accumulator;
  }

看到这个代码的作用是把相同的key中的value全部设置一下,查看log信息如下:

首先是map的输出(key在101~103):


(key在104~107):



combiner的输出:


这样看到数据的输出后,就可以很好的理解combiner的具体操作了;

最后看reducer的操作,就是把combiner的输出进行排序即可:


但是,看到上面的log信息,似乎不是这样的,关于那个Vectors.topKElements方法没有细看,应该是和猜测的不同操作吧,这个下次在看了。


分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990


原文地址:https://www.cnblogs.com/suncoolcat/p/3370822.html