矩阵乘法在hadoop的实现

先随机生成一个矩阵,矩阵的行数与列数由用户输入:

#!/bin/bash
for i in `seq 1 $1`
do
for j in `seq 1 $2`
do
s=$((RANDOM%100))
echo "s==="$s
echo -e "$i,$j $s" >>M_$1_$2

done
done

for i in `seq 1 $2`
do
for j in `seq 1 $3`
do
s=$((RANDOM%100))
echo -e "$i,$j $s" >>N_$2_$3
done
done

执行上面的shell脚本,即可生成一个矩阵。矩阵的乘法在hadoop的并行化实现,其基本的实现思路如下:

map:对于矩阵的M的每一个元素mij,产生一系列的key-value对<(i,k),(M,j,mij)>;对于矩阵的N的每个元素Njk,产生一系列的key-value对<(j,k),(N,j,mjk)>;

reduce:对于每个键(i,k)相关联的值(M,j,mij),(N,j,mjk),根据相同的j值将mij和njk分别存入不同的数组中,然后将两者的的第J个元素抽取出来分别相乘,最后相加,即可得到Pik的值。

下面给出hadoop的map和reduce代码的实现。

public static class MatrixMapper extends Mapper<Object,Text,Text,Text>{

      private Text map_key=new Text();

   private Text map_value=new Text();

    public void setup(Context context) throws IOException{

   Configuration conf=context.getConfiguration();

  columnN=Integer.parseInt(conf.get("columnN"));

  rowM=Integer.parseInt(conf.get("rowM"));

  }

     public void map(Object key,Text value,Context context) throws IOException,InterruptedException{

  FileSplit fileSplit=(FileSplit )context.getInputSplit();

   String fileName=fileSplit.getPath().getName();

  String[] tuple=value.toString().split(",");

  if(fileName.startWith("M")){

    int i=Integer.parseInt(tuple[0]);

    String tuples=tuple[1].split(" ");

    int j=Integer.parseInt(tuples[0]);

    int Mij=Integer.parseInt(tuples[1]);

    for(int k=1;k<columnN+1;k++){

    map_key.set(i+","+k);

    map_value.set("M"+","+j+","+Mij);

    context.write(map_key,map_value);

    }

  }else if(fileName.startWith("N")){

    int j=Integer.parseInt(tuple[0]);

    String tuples=tuple[1].split(" ");

    int k=Integer.parseInt(tuples[0]);

    int Njk=Integer.parseInt(tuples[1]);

    for(int i=1;i<rowM+1;i++){

    map_key.set(i+","+k);

    map_value.set("N"+","+j+","+Njk);

    context.write(map_key,map_value);

  }

  }

}

 通过map函数的处理,对每一行数据,根据间隔符进行分割。这样就得到形如<(2,2),(M,3,7)>这种格式的键值对,从而方便在reduce函数相乘。reduce的函数如下:

public static class MatrixReducer extends Reducer<Text,Text,Text,Text>{

  private int sum=0;

  

  public void setup(Context context) throws IOException{

     Configuration conf=context.getConfiguration();

    columnM=Integer.parseInt(conf.get("columnM"));

   }

  public void reduce(Text,key,Iterable<Text> values,Context context) throws IOException,InterruptedException{

    int[] M=new int[columnM+1];

    int[] N=new int[columnM+1];

    for(Text val:values){

      String[] tuple=val.toString().split(",");

      if(tuple[0].equals("M")){

        M[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);

      }else{

        N[Integer.parseInt(tuple[1])]=Integer.parseInt(tuple[2]);

      }

    }

    //矩阵乘法

    for(int j=1;j<columnM+1;j++){

      sum+=M[j]*N[j];

    }

    context.write(key,new Text(sum));

    sum=0;

  }

}

原文地址:https://www.cnblogs.com/hd-zg/p/5905655.html