Mapreduce -- PageRank

PageRank 简单理解为网页排名,但是网页是根据什么排名的,接下来就简单介绍一下。

举例:

假设网页 A 的内容中有网页 B,C 和 D 的链接,并且 A 的 PageRank的值为0.25。

那接下里我们就可以计算在网页 A 中的其他网页的PageRank的值了。我们拿网页 B 来进行说明,

在网页 A 中的网页 B 的 PageRank 为 0.25 * (1/n) 其中n为网页 A 中网页链接数,结果则为 0.25*(1/3)。

可以简单理解为A的PageRank被B,C 和 D 平分了,B分到了0.25的三分之一。

然后将所有网页中的关于网页B的pagerank值求出来,就是网页B真实的pagerank了。

但是上面的例子没有考虑到如下的特殊情况:

1 网页A中只有指向自己的网页链接。

2 网页A中没有任何链接。

如果出现以上情况,会导致pagerank结果不准确。

所以出现了下面的公式:

result = sum * n + (1-n)/N

sum 为上面计算出来的,如网页B在所有网页中的pagerank值的总和。

n 可以理解为停留在当前网页继续进行网页跳转浏览的概率

1-n 可以理解为不访问当前网页的任何链接,从浏览器的地址栏输入,转去其他网页的概率。

N 为网页的数量

下面介绍通过MapReduce实现PageRank

简单的流程分析:

Map

取一行数据进行说明

A    B    C    D

网页A中有网页B,C,D的链接;

刚开始给网页A一个默认的pagerank值,然后根据这个值计算其他网页链接在网页A中的PageRank。处理后的数据如下:

A    0.25    B    C    D
B    0.25*(1/3)
C    0.25*(1/3)
D    0.25*(1/3)

Reduce

然后通过 Reduce 计算出各个网页在其他网页中的 PageRank 的总和 sum,然后代入公式计算实际的PageRank,并更新 A 0.25 B C D 中的数据,这里将0.25更新为计算出来真实的 PageRank。

重复计算各个网页的 PageRank 的值,直到 PageRank 的结果收敛,值趋于稳定。

A    0.25    B    C    D  =>A    ?    B    C    D

测试数据:

A    B       C       D
B    A       D
C    C
D    B       C

测试代码:

RunJob.java

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.Text;
  5 import org.apache.hadoop.mapreduce.Job;
  6 import org.apache.hadoop.mapreduce.Mapper;
  7 import org.apache.hadoop.mapreduce.Reducer;
  8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 11 
 12 import java.io.IOException;
 13 
 14 /**
 15  * Created by Edward on 2016/7/13.
 16  */
 17 public class RunJob {
 18 
 19     public static enum ValueEnum{
 20         CLOSURE_VALUE;
 21     }
 22 
 23     public static void main(String[] args)
 24     {
 25 
 26         //access hdfs's user
 27         System.setProperty("HADOOP_USER_NAME","root");
 28 
 29         Configuration conf = new Configuration();
 30         conf.set("fs.defaultFS", "hdfs://node1:8020");
 31 
 32         try {
 33             int i = 0;
 34             while(true) {
 35                 i++;
 36                 conf.setInt("count", i);
 37 
 38                 FileSystem fs = FileSystem.get(conf);
 39                 Job job = Job.getInstance(conf);
 40                 job.setJarByClass(RunJob.class);
 41                 job.setMapperClass(MyMapper.class);
 42                 job.setReducerClass(MyReducer.class);
 43 
 44                 //需要指定 map out 的 key 和 value
 45                 job.setOutputKeyClass(Text.class);
 46                 job.setOutputValueClass(Text.class);
 47 
 48                 //设置输入类的
 49                 job.setInputFormatClass(KeyValueTextInputFormat.class);
 50                 if(i==1)
 51                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/input"));
 52                 else
 53                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/output/pr"+(i-1)));
 54 
 55                 Path path = new Path("/test/pagerank/output/pr"+i);
 56                 if (fs.exists(path))//如果目录存在,则删除目录
 57                 {
 58                     fs.delete(path, true);
 59                 }
 60                 FileOutputFormat.setOutputPath(job, path);
 61 
 62                 boolean b = job.waitForCompletion(true);
 63                 if (b) {
 64                     long closure =job.getCounters().findCounter(ValueEnum.CLOSURE_VALUE).getValue();
 65                     double avg= closure/4000.0;//计算收敛的平均值,浮动小于0.001则认为收敛
 66                     System.out.println("执行第"+i+"次, closure="+closure+",avg="+avg);
 67                     if(avg < 0.001){
 68                         System.out.println("总共执行了"+i+"次,之后收敛");
 69                         break;
 70                     }
 71                 }
 72             }
 73 
 74         } catch (Exception e) {
 75             e.printStackTrace();
 76         }
 77     }
 78 
 79 
 80     public static class MyMapper extends Mapper<Text, Text, Text, Text> {
 81         @Override
 82         protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
 83             AdjacentNodes adjacentNodes  = new AdjacentNodes();
 84             int count = context.getConfiguration().getInt("count", 1);
 85 
 86             if(count == 1)
 87             {
 88                 AdjacentNodes firstAdj = new AdjacentNodes();
 89                 firstAdj.setValue(1.0);
 90                 firstAdj.setNodes(value.toString().split("	"));
 91                 adjacentNodes = firstAdj;
 92             }
 93             else
 94             {
 95                 //格式化 value: 1.0 B C D
 96                 adjacentNodes.formatInfo(value.toString());
 97             }
 98             //A 1.0 B C D
 99             context.write(key, new Text(adjacentNodes.toString()));
100 
101             double pagerank = adjacentNodes.getValue()/adjacentNodes.getNum();
102             for(int i=0; i<adjacentNodes.getNum(); i++)
103             {
104                 String node = adjacentNodes.getNodes()[i];
105                 //B 0.333
106                 context.write(new Text(node), new Text(pagerank+""));
107             }
108         }
109     }
110 
111     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
112         @Override
113         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
114 
115             AdjacentNodes adjacentNodes = new AdjacentNodes();
116 
117             Double sum = 0.0;
118             for(Text adj : values)
119             {
120                 String str = adj.toString();
121                 if(str.split("	").length>1)
122                 {
123                     adjacentNodes.formatInfo(str);
124                 }
125                 else{
126                     sum += Double.parseDouble(str);   //对节点的 pagerank 求和,
127                 }
128             }
129 
130             //计算pagerank
131             double n = 0.80;
132             double pagerank = sum * n + (1-n)/4.0;// 计算pagerank 公式 sum * n + (1-n)/N
133 
134             //计算收敛的差值
135             int closure =(int)(Math.abs(pagerank - adjacentNodes.getValue()) * 1000);
136             //通过context.getCounter(ENUM)方法,每次执行reduce将closure的值进行累加,结果传递给主函数,
137 
138             context.getCounter(ValueEnum.CLOSURE_VALUE).increment(closure);
139 
140             adjacentNodes.setValue(pagerank);
141             context.write(key, new Text(adjacentNodes.toString()));
142         }
143     }
144 }

AdjacentNodes.java
/**
 * Created by Edward on 2016/7/14.
 */
public class AdjacentNodes {

    double value = 0.0;
    int num = 0;
    String[] nodes = null;

    public void formatInfo(String str)
    {
        String[] val = str.split("	");
        this.setValue(Double.parseDouble(val[0]));
        this.setNum(val.length-1);

        if(this.num != 0)
            this.nodes = new String[this.num];

        for(int i=1; i<val.length; i++)
        {
            this.nodes[i-1] = val[i];
        }
    }

    public boolean isEmpty()
    {
        if(this.num == 0)
            return true;
        else
            return false;
    }

    public void setNodes(String[] nodes) {
        this.nodes = nodes;
        this.num = nodes.length;
    }

    public void setNum(int num) {
        this.num = num;
    }

    public void setValue(double value) {
        this.value = value;
    }

    public double getValue() {
        return value;
    }

    public int getNum() {
        return num;
    }

    public String[] getNodes() {
        return nodes;
    }


    @Override
    public String toString() {

        StringBuffer stringBuffer = new StringBuffer(this.value+"");

        for(int i=0; i<this.num; i++)
        {
            stringBuffer.append("	"+this.nodes[i]);
        }

        return stringBuffer.toString();
    }
}

结果数据:

A       0.10135294176208584     B       C       D
B       0.12838069628609527     A       D
C       0.6560527651143326      C
D       0.12838069628609527     B       C

总共执行了24次,之后收敛;

PageRank值越高,越值得推荐。

原文地址:https://www.cnblogs.com/one--way/p/5672207.html