浅谈hadoop中mapreduce的文件分发

近期在做数据分析的时候。须要在mapreduce中调用c语言写的接口。此时就须要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有对应的组件来帮助我们完毕这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西能够做到第三方文件的分发和缓存功能,以下具体解释:


假设我们须要在map之间共享一些数据,假设信息量不大,我们能够保持在conf中。可是假设我们须要共享一些配置文件,jar包之类的。此时DistributedCache能够满足我们的需求,使用DistributedCache的过程例如以下:

1.正确配置被分发的文件的路径(hdfs上的路径)

2.在自己定义的mapper或reducer中获取文件下载到本地后的路径(linux文件系统路径)。通常是重写configure或者重写setup(新方式)

3.在自己定义的mapper或reducer类中读取这些文件的内容

以下以代码说明三个步骤:

1.Configuration conf = new Configuration();

  DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/libJMeshCalc.so"), conf);

  DistributedCache.addCacheArchive(new URI("/user/tinfo/zhangguochen/libJMeshCalc.zip"),conf);
  DistributedCache.addFileToClassPath(new URI("/user/tinfo/zhangguochen/libMeshCal.jar"), conf);

    或者

    conf.set("mapred.cache.files", "/myapp/file");

   conf.set("mapred.cache. archives", "/mayapp/file.zip");

   以上是配置须要分发的hdfs上的文件,可是前提是这些文件必须在hdfs上存在,看源代码可知道DistributedCache的静态方法事实上就是封装了conf.set的动作。

2.在自己的mapper类中,使用DistributedCache获取下载到本地的文件。大部分情况下这些操作都是重写configure接口(或者setup),然后把本地文件路径保存在mapper类的成员变量中。供map方法使用。代码例如以下:

   private Path[] localFiles;

   public void setup(Context context) {

       localFiles = DistributeCache.getLocalCacheFiles(context.getConfiguration());

       for(Path temp:localFiles) {

            String path = temp.toString();//path就是此文件在本地的路径

            if(path.contains("myfileName")) {//获取到自己须要的文件

            }

       }

    }

getLocalCacheFiles返回的是数组(元素类型是Path),数组内容是这个task(map或reduce)所属的job设定的全部须要被分发的文件,假设设置了     多个文件。能够遍历Path数组,用String.contains("KeyWord")来推断是否是你所须要的文件。


   获取压缩包的路径

   private  File[] inputFiles;

   private Path[] localArchives;

   public void setup(Context context) {

       localArchives = DistributeCache.getLocalCacheArvhives();

       for(Path archive : localArchives) {

            if(archive.toString.contains("mytarName")) {//找到自己须要的文件

                 inputFiles = new File(archive.toString()).listFiles();//获取压缩包下的全部 文件

            }

       }

   }

    也能够用DistributedCache将所使用到的第三方jar包载入到classpath中DistributedCache.addFileToClassPath

  

   通过以上代码发现假设要使用这些分发到各个节点上的文件操作比較复杂,DistributedCache也提供一种更方便的使用方法。即能够为每个分发的文件创建一个符号链接,然后hadoop就会在当前mapreduce的运行路径下创建一个到源文件的链接,我们就能够在mapreduce中直接使用这些文件,而不必关心这些文件在本地的路径。

  演示样例:

  1.把文件分发到缓存中

   Configuration conf = new Configuration();

   DistributedCache.createSymlink(conf);//创建符号链接
   DistributedCache.addCacheFile(new URI("/user/tinfo/zhangguochen/file1#myfile"), conf);//增加分布式缓存,myfile是符号


   2.在mapreduce中使用

    public void setup(Context context) {

       File myfile = new File("myfile");//在这里就能够直接通过符号myfile使用此文件

    }

   或者用下面方式:

    conf.set("mapred.cache.files", "/data/data#mData");
     conf.set("mapred.cache.archives", "/data/data.zip#mDataZip");
     conf.set("mapred.create.symlink", "yes"); // 是yes。不是true
     DistributedCache.createSymlink(Configuration)
     在map阶段,仅仅须要File file = new File("mData");就可以获得该文件……

下面资料来自网络。如有雷同,纯属意外

DistributedCache是Hadoop提供的文件缓存工具。它能够自己主动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。它具有下面几个特点:缓存的文件是仅仅读的,改动这些文件内容没有意义;用户能够调整文件可见范围(比方仅仅能用户自己使用,全部用户都能够使用等),进而防止反复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且仅仅发给任务被调度到的节点。本文将介绍DistributedCache在Hadoop 1.0和2.0中的用法及实现原理。

Hadoop DistributedCache有下面几种典型的应用场景:1)分发字典文件。一些情况下Mapper或者Reducer须要用到一些外部字典。比方黑白名单、词表等;2)map-side join:当多表连接时。一种场景是一个表非常大,一个表非常小。小到足以载入到内存中,这时能够使用DistributedCache将小表分发到各个节点上,以供Mapper载入使用;3)自己主动化软件部署:有些情况下,MapReduce需依赖于特定版本号的库,比方依赖于某个版本号的PHP解释器,一种做法是让集群管理员把这个版本号的PHP装到各个机器上,这通常比較麻烦,还有一种方法是使用DistributedCache分发到各个节点上,程序执行完后,Hadoop自己主动将其删除。


Hadoop提供了两种DistributedCache使用方式。一种是通过API。在程序中设置文件路径,第二种是通过命令行(-files。-archives或-libjars)參数告诉Hadoop,个人建议使用第二种方式。该方式可使用下面三个參数设置文件:

(1)-files:将指定的本地/hdfs文件分发到各个Task的工作文件夹下。不正确文件进行不论什么处理;

(2)-archives:将指定文件分发到各个Task的工作文件夹下,并对名称后缀为“.jar”、“.zip”。“.tar.gz”、“.tgz”的文件自己主动解压,默认情况下。解压后的内容存放到工作文件夹下名称为解压前文件名称的文件夹中,比方压缩包为dict.zip,则解压后内容存放到文件夹dict.zip中。为此,你能够给文件起个别名/软链接,比方dict.zip#dict,这样,压缩包会被解压到文件夹dict中。

(3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自己主动加入到任务的CLASSPATH环境变量中。

hadoop jar xxx.jar -files hdfs://xxx/xx

hadoop jar xxx.jar -libjars hdfs://xxx/xxx.jar,hdfs://xxx/xx2.jar


前面提到。DistributedCache分发的文件是有可见范围的。有的文件能够仅仅对当前程序可见,程序执行完后,直接删除;有的文件仅仅对当前用户可见(该用户全部程序都能够訪问)。有的文件对全部用户可见。DistributedCache会为每种资源(文件)计算一个唯一ID,以识别每一个资源,从而防止资源反复下载。举个样例。假设文件可见范围是全部用户。则在每一个节点上,第一个使用该文件的用户负责缓存该文件,之后的用户直接使用就可以,无需反复下载。那么。Hadoop是如何区分文件可见范围的呢?

在Hadoop 1.0版本号中。Hadoop是以HDFS文件的属性作为标识推断文件可见性的,须要注意的是,待缓存的文件即使是在Hadoop提交作业的client上。也会首先上传到HDFS的某一文件夹下,再分发到各个节点上的,因此。HDFS是缓存文件的必经之路。

对于常常使用的文件或者字典。建议放到HDFS上,这样能够防止每次反复下载,做法例如以下:

比方将数据保存在HDFS的/dict/public文件夹下。并将/dict和/dict/public两层文件夹的可运行权限全部打开(在Hadoop中。可运行权限的含义与linux中的不同,该权限仅仅对文件夹有意义,表示能够查看该文件夹中的子文件夹),这样,里面全部的资源(文件)便是全部用户可用的,而且第一个用到的应用程序会将之缓存到各个节点上,之后全部的应用程序无需反复下载。能够在提交作业时通过下面命令指定:

-files hdfs:///dict/public/blacklist.txt, hdfs:///dict/public/whilelist.txt

假设有多个HDFS集群能够指定namenode的对外rpc地址:

-files hdfs://host:port/dict/public/blacklist.txt, hdfs://host:port/dict/public/whilelist.txt

DistributedCache会将blacklist.txt和whilelist.txt两个文件缓存到各个节点的一个公共文件夹下。并在须要时,在任务的工作文件夹下建立一个指向这两个文件的软连接。

假设可运行权限没有打开。则默认仅仅对该应用程序的拥有者可见,该用户全部应用程序可共享这些文件。

一旦你对/dict/public下的某个文件进行了改动,则下次有作业用到相应文件时,会发现文件被改动过了,进而自己主动又一次缓存文件。

对于一些频繁使用的字典,不建议存放在client。每次通过-files指定,这种文件,每次都要经历下面流程:上传到HDFS上—》缓存到各个节点上—》之后不再使用这些文件,直到被清除,也就是说,这种文件。仅仅会被这次执行的应用程序使用,假设再次执行相同的应用程序,即使文件没有被改动。也会又一次经历以上流程。很耗费时间,尤其是字典许多,很大时。

DistributedCache内置缓存置换算法。一旦缓存(文件数目达到一定上限或者文件总大小超过某一上限)满了之后。会踢除最久没有使用的文件。

在Hadopo 2.0中。自带的MapReduce框架仍支持1.0的这样的DistributedCache使用方式。但DistributedCache本身是由YARN实现的。不再集成到MapReduce中。

YARN还提供了非常多相关编程接口供用户调用,有兴趣的能够阅读源码。

以下介绍Hadoop 2.0中。DistributedCache通过命令行分发文件的基本使用方式:

(1)执行Hadoop自带的example样例, dict.txt会被缓存到各个Task的工作文件夹下,因此,直接像读取本地文件一样,在Mapper和Reducer中,读取dict.txt就可以:

1
2
3
4
5
6
bin/Hadoopjar
share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar
wordcount
-files hdfs:///dict/public/dict.txt
/test/input
/test/output

(2)Hadoop Streaming样例,须要通过-files指定mapper和reducer可运行文件或者脚本文件,这些文件就是通过DistributedCache分发到各个节点上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/bin/bash
HADOOP_HOME=/opt/yarn-client
INPUT_PATH=/test/input/data
OUTPUT_PATH=/test/output/data
echo"Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME/bin/hadoopfs -rmr $OUTPUT_PATH
 
${HADOOP_HOME}/bin/hadoopjar
   ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar
  -D mapred.reduce.tasks=2
  -files mapper,reducer
  -input $INPUT_PATH
  -output $OUTPUT_PATH
  -mapper mapper
  -reducer reducer

(3)接下给出一个缓存压缩文件的样例。如果压缩文件为dict.zip。里面存的数据为:

1
2
3
4
data/1.txt
data/2.txt
mapper.list
reducer.list

通过-archives參数指定dict.zip后。该文件被解压后,将被缓存(实际上是软连接)到各个Task的工作文件夹下的dict.zip文件夹下,组织结构例如以下:

1
2
3
4
5
6
dict.zip/
    data/
        1.txt
        2.txt
    mapper.list
    reducer.list

你能够在Mapper或Reducer程序中。使用类似以下的代码读取解压后的文件:

 

1
2
3
File file2 = read(“dict.zip/data/1.txt”, “r”);
…….
File file3 = read(“dict.zip/mapper.list”, “r”);

假设你想直接将内容解压到Task工作文件夹下,而不是子文件夹dict.zip中。能够用“-files”(注意,不要使用-archives,“-files”指定的文件不会被解压)指定dict.zip,并自己在程序中实现解压缩:

1
2
3
4
#include <cstdlib>
…….
system(“unzip –q dict.zip”); //C++代码
……

总之,Hadoop DistributedCache是一个很好用的工具,合理的使用它可以解决许多很困难的问题。 

   总结下面:假设mr程序中须要第三方jar包,能够通过在程序中使用DistributedCache,也能够在命令中使用-libjars来实现,可是这些引入的jar都仅仅能够在mr任务启动之后来使用。假设你在启动MR任务之前调用了第三方jar包的类,那这就会有问题,会在启动任务的时候找不到这个类。

此时能够使用例如以下方式解决:

   在你的project里面建立一个lib目录。然后把全部的第三方jar包放到里面去。hadoop会自己主动载入lib依赖里面的jar。 这样就能够在mr启动之前也能够使用第三方jar了。

   方法调用顺序为(以libjars为例): -libjars --->conf.set("tmpjars")--->

DistributedCache.addArchiveToClassPath--->conf.set("mapreduce.job.cache.archives","")

相关文章链接:http://blog.csdn.net/xiaolang85/article/details/11782539

                        http://blog.csdn.net/lazy0zz/article/details/7505712

              

原文地址:https://www.cnblogs.com/mengfanrong/p/5213970.html