MapReduce 矩阵相乘

对于矩阵A[mn]*B[nl]=C[ml]。这里可以并行起来的就是每个Cij,对于Cij而言,他是由A的第i行和B的第j列相乘得到。由于大的矩阵中经常是稀疏矩阵,所以一般用行列值表示
例如对于A:
1 2 3
4 5 0
7 8 9
10 11 12
他的行列值表为:
1 1 1
1 2 2
1 3 3
2 1 4
2 2 5
3 1 7
3 2 8
3 3 9
4 1 10
4 2 11
4 3 12
对于B:
10 15
0 2
11 9
行列值:
1 1 10
1 2 15
2 2 2
3 1 11
3 2 9
我们以计算C11为例,这里需要用到A的行列值中行为1的数据和B行列值中列为1的数据。我们的想法是每个Reduce计算一个Cij,并且map输出的key为Cij的i,j,value为A的第i行,和B的第j列。那么整体的框架就是在map中每次读入一行数据,如果是从mat_a.txt中读入的就是A,如果是从mat_b.txt中读入的那就是B的一个数据,同时我们发现对于A11他会在计算C11和C12的时候都被用到,其实是A的第一行数据的时候会在计算C第一行数据的时候被用到,那么我们需要对每个Aij数据进行多次分发,分发的次数为B的列数,例如A11需要分发<1,1>,<1,2>规律就是A的<row,i>i为B的列数,同理可得对于B11来说会在求C11,C21,C31,C41的时候被使用,所以每个C的数据需要被分发4次,规律为<j,col>j为A的行数。这样通过map之后就可以获得每个Cij所需要的数据了。
例如对于C1,2经过map后的数据为
a 1 1
a 2 2
a 3 3
b 1 15
b 2 2
b 3 9
接着为了减少空间,在reduce中,创建了一个class,用与存放下标和数据。
class A implements Comparable<A>{
                 public int index;
                 public int value;
                A( int index, int value){
                                 this .index =index;
                                 this .value =value;
                }              
                 public int compareTo(A a){
                                 if (index <a. index) return -1;
                                 else return 1;
                }
                
}
并且创建两个实例list1和list2,1用于放A的数据,2用于放B的数据。经过排序后(一开始没有排序,导致出错),就遍历这两个,由于对于0值这里没有存放数据,所以在相乘的条件是两者的index相同。自己在写的时候在这个地方出现了一个逻辑上的错误,就是在遍历两个list,并让他们在index相等的时候相乘。
                         int j=0;
                                 for (int i=0;i<list1.size();i++){
                                                 if (list1.get(i).index ==list2.get(j). index){
                                                                sum=sum+list1.get(i). value *list2.get(j).value ;
                                                                j++;
                                                }if (list1.get(i). index>list2.get(j). index ){
                                                                j++;
                                                }
                                }
这是一开始写的,发现一直报下标出错,后来发现原来是第二个if的问题,例如此时的list1是
1 1
2 2
3 3
list2 :
1 10
3 2
在第一个if判断后,j++了结果后进行了下一个if。而应该是else if。一个疏忽大意。
具体代码:
Map
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
                 public static int MATRIX_I =4;
                 public static int MATRIX_J =3;
                 public static int MATRIX_K =2;
                 public void map(LongWritable ikey, Text ivalue, Context context)
                                                 throws IOException, InterruptedException {
                                String filename = ((FileSplit) context.getInputSplit()).getPath().getName();
                                String line=ivalue.toString();
                                 int row=Integer.parseInt( line.split( " ")[0]);
                                 int col=Integer.parseInt( line.split( " ")[1]);
                                 int num=Integer.parseInt( line.split( " ")[2]);
                                 if (filename.compareTo("mat_a.txt" )==0){
                                                 for (int i=1;i<= MATRIX_K;i++){
                                                                String key="<" +row+ ":"+i+ ">" ;
                                                                String value="a" + " "+col+ " " +num;
                                                                context.write( new Text(key), new Text(value));
                                                }
                                                
                                } else if (filename.compareTo( "mat_b.txt")==0){
                                                 for (int i=1;i<= MATRIX_I;i++){
                                                                String key="<" +i+ ":"+col+ ">" ;
                                                                String value="b" + " "+row+ " " +num;
                                                                context.write( new Text(key), new Text(value));
                                                }
                                }
                                
                                
                }
 
}
 
Reduce:
 
 
                 public void reduce(Text _key, Iterable<Text> values, Context context)
                                                 throws IOException, InterruptedException {
                                 // process values
                                 int n=0;
                                ArrayList<A> list1= new ArrayList<A>();
                                ArrayList<A> list2= new ArrayList<A>();
                                System. out .println("                            " );
                                System. out .println("key is " +_key.toString());
                                System. out .println("                            " );
                                 for (Text val : values) {
                                                String[] line=val.toString().split(" " );
                                                 /*
                                                System.out.println("                            ");
                                                System.out.println("line is "+line[0]+" "+line[1]+" "+line[2]);
                                                System.out.println("                            ");
                                                */
                                                A a=new A(Integer.parseInt(line[1]),Integer. parseInt(line[2]));
                                                 if (line[0].compareTo("a" )==0){
                                                                System. out .println("                            " );
                                                                System. out .println("this is a" );
                                                                System. out .println("                            " );
                                                                list1.add(a);
                                                } else if (line[0].compareTo( "b")==0){
                                                                System. out .println("                            " );
                                                                System. out .println("this is b" );
                                                                System. out .println("                            " );
                                                                list2.add(a);
                                                }
                                }
                                 int sum=0;
                                 int j=0;
 
                                Collections. sort(list1);
                                Collections. sort(list2);
                                 for (A a:list1){
                                                System. out .println("list1 is " +a. index+ " " +a.value );
                                }
                                System. out .println("                            " );
                                 for (A a:list2){
                                                System. out .println("list2 is " +a. index+ " " +a.value );
                                }
                                 for (int i=0;i<list1.size();i++){
                                                 if (list1.get(i).index ==list2.get(j). index){
                                                                sum=sum+list1.get(i). value *list2.get(j).value ;
                                                                j++;
                                                } else if (list1.get(i). index>list2.get(j). index ){
                                                                j++;
                                                }
                                }
                                System. out .println("                            " );
                                System. out .println("sum is " +sum);
                                System. out .println("                            " );
                                context.write(_key, new Text(String.valueOf(sum)));
                                
                }
 
}
原文地址:https://www.cnblogs.com/sunrye/p/4544773.html