DataJoin: Replicated join using DistributedCache

Reduce-side join比较灵活但效率不高,因为在数据在reduce端才作join,在网络中需要shuffle所有数据,而且在join时又丢掉很多无用的数据。如果能在map端执行join则会有较高的效率,但map不容易同时获得需要作join的多个记录。在实际的应用中,需要作join的数据源可能一个很大一个比较小,如果此小数据源小到能够放到mapper的内存中,并把此数据源拷贝到所有的mapper机器上,那就可以在map端执行join. 这就是Replicated join.

Hadoop has a mechanism called distributed cache that’s designed to distribute files to all nodes in a cluster. It’s normally used for distributing files containing “background” data needed by all mappers. For example, if you’re using Hadoop to classify documents , you may have a list of keywords for each class. You would use distributed cache to ensure all mappers have access to the lists of keywords, the “background” data.

org.apache.hadoop.filecache.DistributedCache

There are two steps to using this class. First, when configuring a job, you call the static method DistributedCache.addCacheFile() to specify the files to be disseminated to all nodes. These files are specified as URI objects, and they default to HDFS unless a different filesystem is specified. The JobTracker will take this list of URIs and create a local copy of the files in all the TaskTrackers when it starts the job. In the second step, your mappers on each individual TaskTracker will call the static method DistributedCache.getLocalCacheFiles() to get an array of local file Paths where the local copy is located. At this point the mapper can use standard Java file I/O techniques to read the local copy.

代码实现如下:

在配置job时添加:

  DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());  //旧API中直接用 conf

MapClass:

public class MapClass extends Mapper<Text,Text,Text,Text>{      

 

       private Hashtable<String,String> hashTable = new Hashtable<String,String>(); 

 

       protected void setup(Context context) throws IOException,InterruptedException             

              Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());          

              if(cacheFiles != null && cacheFiles.length > 0) {

                     String line;

                     String[] tokens;

                     BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString()));                    

                     while((line=br.readLine()) != null) {

                            tokens = line.split("\t",2);

                            hashTable.put(new Text(tokens[0]),Double.parseDouble(tokens[1]));

                     }

                     br.close();

              }

       }

      

       protected void map(Text key,Text values,Context context) throws IOException,InterruptedException {

              String joinValue = hashTable.get(key);

              if(joinValue != null){

                     context.write(key,new Text(value.toString()+","+joinValue));

              }           

       } 

}

原文地址:https://www.cnblogs.com/liangzh/p/2508946.html