mapreduce实现图的BFS遍历

一.MapReduce

 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。分别有:

"Map(映射)"和"Reduce(归约)"

Map的作用是过滤一些原始数据,Reduce则是处理这些数据,得到我们想要的结果,

 

 

 

二.mapreduce实现图的BFS图示

要遍历的图:

输入样例:

1<tab>2,3|0|GRAY|source

2<tab>1,3,4,5|Integer.MAX_VALUE|WHITE|null

3<tab>1,4,2|Integer.MAX_VALUE|WHITE|null

4<tab>2,3|Integer.MAX_VALUE|WHITE|null

5<tab>2|Integer.MAX_VALUE|WHITE|null

1.中间输出1:

Reducer 1: (part-r-00000)

2<tab>1,3,4,5,|1|GRAY|1
5<tab>2,|Integer.MAX_VALUE|WHITE|null

Reducer 2: (part-r-00001)

3<tab>1,4,2,|1|GRAY|1

Reducer 3: (part-r-00002)

1<tab>2,3,|0|BLACK|source
4<tab>2,3,|Integer.MAX_VALUE|WHITE|null


2.中间输出2:

Reducer 1: (part-r-00000)

2<tab>1,3,4,5,|1|BLACK|1
5<tab>2,|2|GRAY|2

Reducer 2: (part-r-00001)

3<tab>1,4,2,|1|BLACK|1

Reducer 3: (part-r-00002)

1<tab>2,3,|0|BLACK|source
4<tab>2,3,|2|GRAY|2

          

            

 

3.最终输出:


Reducer 1: (part-r-00000)

2<tab>1,3,4,5,|1|BLACK|1
5<tab>2,|2|BLACK|2

Reducer 2: (part-r-00001)

3<tab>1,4,2,|1|BLACK|1

Reducer 3: (part-r-00002)

1<tab>2,3,|0|BLACK|source
4<tab>2,3,|2|BLACK|2

      

 

三.mapreduce实现图的BFS代码

  1.Node.java

 

    import java.util.*;
     
    import org.apache.hadoop.io.Text;

 
    public class Node {
     
        // 标识节点是第几次访问
        public static enum Color {
            WHITE, GRAY, BLACK
        };
     
        private String id; 
        private int distance; // 到出发点的距离
        private List<String> edges = new ArrayList<String>(); // 边集
        private Color color = Color.WHITE;
        private String parent; //父节点
     
        public Node() {
           
            edges = new ArrayList<String>();
            distance = Integer.MAX_VALUE;
            color = Color.WHITE;
            parent = null;
        }
     
        public Node(String nodeInfo) {
     
            String[] inputLine = nodeInfo.split("	"); 
            String key = "", value = ""; 
     
            try {
                key = inputLine[0]; // 节点id
                value = inputLine[1]; //邻节点list
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
     
            }
     
            String[] tokens = value.split("\|"); // /tokens[0] = 邻节点list, tokens[1]= 距离, tokens[2]= 颜色, tokens[3]= 父节点
     
            this.id = key;
     
            //设置边节点
            for (String s : tokens[0].split(",")) {
                if (s.length() > 0) {
                    edges.add(s);
                }
            }
     
            // 设置距离
            if (tokens[1].equals("Integer.MAX_VALUE")) {
                this.distance = Integer.MAX_VALUE;
            } else {
                this.distance = Integer.parseInt(tokens[1]);
            }
     
            // 设置颜色
            this.color = Color.valueOf(tokens[2]);
     
            // 设置父节点
            this.parent = tokens[3];
     
        }
     
        public Text getNodeInfo() {
            StringBuffer s = new StringBuffer();
     
         
            try {
                for (String v : edges) {
                    s.append(v).append(",");
                }
            } catch (NullPointerException e) {
                e.printStackTrace();
                System.exit(1);
            }
     
            s.append("|");

            if (this.distance < Integer.MAX_VALUE) {
                s.append(this.distance).append("|");
            } else {
                s.append("Integer.MAX_VALUE").append("|");
            }
     

            s.append(color.toString()).append("|");
     

            s.append(getParent());
     
            return new Text(s.toString());
        }

    }

 

 

 2.SearchMapper

 

    import java.io.IOException;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
     
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.LongWritable;

    public class SearchMapper extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context, Node inNode)
                throws IOException, InterruptedException {
     
         
            if (inNode.getColor() == Node.Color.GRAY) {
                for (String neighbor : inNode.getEdges()) { 
     
                    Node adjacentNode = new Node(); 
     
                    adjacentNode.setId(neighbor); 
                    adjacentNode.setDistance(inNode.getDistance() + 1); 
                    context.write(new Text(adjacentNode.getId()), adjacentNode.getNodeInfo());
     
                }

                inNode.setColor(Node.Color.BLACK);
            }

            context.write(new Text(inNode.getId()), inNode.getNodeInfo());
     
        }
    }

 

3.SearchReducer

 

 

 public class SearchReducer extends Reducer<Text, Text, Text, Text> {
     
        public Node reduce(Text key, Iterable<Text> values, Context context, Node outNode)
                throws IOException, InterruptedException {

            outNode.setId(key.toString());

            for (Text value : values) {
     
                Node inNode = new Node(key.toString() + "	" + value.toString());

                if (inNode.getEdges().size() > 0) {
                    outNode.setEdges(inNode.getEdges());
                }
                   
               
                if (inNode.getDistance() < outNode.getDistance()) {
                    outNode.setDistance(inNode.getDistance());

                    outNode.setParent(inNode.getParent());
                }

                if (inNode.getColor().ordinal() > outNode.getColor().ordinal()) {
                    outNode.setColor(inNode.getColor());
                }        
               
            }

            context.write(key, new Text(outNode.getNodeInfo()));   
         
            return outNode;
        }
    }

 4.SearchMapperSSSP.java

 

public static class SearchMapperSSSP extends SearchMapper {
 
       
        public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
       
            Node inNode = new Node(value.toString());

            super.map(key, value, context, inNode);
 
        }
    }

 5.SearchReducerSSSP.java

public static class SearchReducerSSSP extends SearchReducer{
 
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

            Node outNode = new Node();

            outNode = super.reduce(key, values, context, outNode);                                     

            if (outNode.getColor() == Node.Color.GRAY)

                context.getCounter(MoreIterations.numberOfIterations).increment(1L);
    }
}

6.BaseJob.java

 

 private Job getJobConf(String[] args) throws Exception {
 
        JobInfo jobInfo = new JobInfo() {
            @Override
            public Class<? extends Reducer> getCombinerClass() {
                return null;
            }
 
            @Override
            public Class<?> getJarByClass() {
                return SSSPJob.class;
            }
 
            @Override
            public Class<? extends Mapper> getMapperClass() {
                return SearchMapper.class;
            }
 
            @Override
            public Class<?> getOutputKeyClass() {
                return Text.class;
            }
 
            @Override
            public Class<?> getOutputValueClass() {
                return Text.class;
            }
 
            @Override
            public Class<? extends Reducer> getReducerClass() {
                return SearchReducer.class;
            }
        };
       
        return setupJob("ssspjob", jobInfo);
 
       
    }
 
    public int run(String[] args) throws Exception {
 
        int iterationCount = 0; 
 
        Job job;
       
        long terminationValue =1;
       
        while( terminationValue >0){
 
            job = getJobConf(args);
   
            if (iterationCount == 0) 
                input = args[0];
            else
              
                input = args[1] + iterationCount;
 
            output = args[1] + (iterationCount + 1); 
 
            FileInputFormat.setInputPaths(job, new Path(input));

            FileOutputFormat.setOutputPath(job, new Path(output));
 
            job.waitForCompletion(true);
 
            Counters jobCntrs = job.getCounters();
            terminationValue = jobCntrs.findCounter(MoreIterations.numberOfIterations).getValue();
            iterationCount++;
       
        }
 
        return 0;
    }
 
    public static void main(String[] args) throws Exception {
 
        int res = ToolRunner.run(new Configuration(), new SSSPJob(), args);
        if(args.length != 2){
            System.err.println("Usage: <in> <output name> ");
        }
        System.exit(res);
    }

 

 

 

原文地址:https://www.cnblogs.com/xmeo/p/6683914.html