Hadoop优化 第一篇 : HDFS/MapReduce

比较惭愧,博客很久(半年)没更新了。最近也自己搭了个博客,wordpress玩的还不是很熟,感兴趣的朋友可以多多交流哈!地址是:http://www.leocook.org/

另外,我建了个QQ群:305994766,希望对大数据、算法研发、系统架构感兴趣的朋友能够加入进来,大家一起学习,共同进步(进群请说明自己的公司-职业-昵称)。

1.应用程序角度进行优化

1.1.减少不必要的reduce任务
若对于同一份数据需要多次处理,可以尝试先排序、分区,然后自定义InputSplit将某一个分区作为一个Map的输入,在Map中处理数据,将Reduce的个数设置为空。

1.2.外部文件引用
如字典、配置文件等需要在Task之间共享的数据,可使用分布式缓存DistributedCache或者使用-files

1.3.使用Combiner
combiner是发生在map端的,作用是归并Map端输出的文件,这样Map端输出的数据量就小了,减少了Map端和reduce端间的数据传输。需要注意的是,Combiner不能影响作业的结果;不是每个MR都可以使用Combiner的,需要根据具体业务来定;Combiner是发生在Map端的,不能垮Map来执行(只有Reduce可以接收多个Map任务的输出数据)

1.4.使用合适的Writable类型
尽可能使用二进制的Writable类型,例如:IntWritable, FloatWritable等,而不是Text。因为在一个批处理系统中将数值转换为文本时低效率的。使用二进制的Writable类型可以降低cpu资源的消耗,也可以减少Map端中间数据、结果数据占用的空间。

1.5.尽可能的少创建新的Java对象
a)需要注意的Writable对象,例如下面的写法:

public void map(...) {
    …
    for (String word : words) {
        output.collect(new Text(word), new IntWritable(1));
    }
}

这样会冲去创建对象new Text(word)和new IntWritable(1)),这样可能会产生海量的短周期对象。更高效的写法见下:

class MyMapper … {
    Text wordText = new Text();
    IntWritable one = new IntWritable(1);
    public void map(...) {
        for (String word: words) {
        wordText.set(word);
            output.collect(wordText, one);
        }
    }
}

b)对于可变字符串,使用StringBuffer而不是String

String类是经过final修饰的,那么每次对它的修改都会产生临时对象,而SB则不会。

2. Linux系统层面上的配置调优
2.1. 文件系统的配置
a) 关闭文件在被操作时会记下时间戳:noatime和nodiratime
b) 选择I/O性能较好的文件系统(Hadoop比较依赖本地的文件系统)

2.2. Linux文件系统预读缓冲区大小
命令blockdev

2.3. 去除RAID和LVM

2.4. 增大同时打开的文件数和网络连接数
ulimit
net.core.somaxconn

2.5. 关闭swap分区
在Hadoop中,对于每个作业处理的数据量和每个Task中用到的各种缓冲,用户都是完全可控的。
/etc/sysctl.conf

2.6. I/O调度器选择
详情见AMD的白皮书

3. Hadoop平台内参数调优
Hadoop相关可配置参数共有几百个,但是其中只有三十个左右会对其性能产生显著影响。
3.1. 计算资源优化
a) 设置合理的slot(资源槽位)
mapred.tasktracker.map.tasks.maximum / mapred.tasktracker.reduce.tasks.maximum
参数说明:每个TaskTracker上可并发执行的Map Task和Reduce Task数目
默认值:都是2
推荐值:根据具体的节点资源来看,推荐值是(core_per_node)/2~2*(cores_per_node)
单位:无

3.2. 节点间的通信优化
a) TaskTracker和JobTracker之间的心跳间隔
这个值太小的话,在一个大集群中会造成JobTracker需要处理高并发心跳,可能会有很大的压力。
建议集群规模小于300时,使用默认值3秒,在此基础上,集群规模每增加100台,会加1秒。
b) 启用带外心跳(out-of-band heartbeat)
mapreduce.tasktracker.outofband.heartbeat
参数说明:主要是为了减少任务分配延迟。它与常规心跳不同,一般的心跳是一定时间间隔发送的,而带外心跳是在任务运行结束或是失败时发送,这样就能在TaskTracker节点出现空闲资源的时候能第一时间通知JobTracker。

3.3. 磁盘块的配置优化
a) 作业相关的磁盘配置:mapred.local.dir
参数说明:map本地计算时所用到的目录,建议配置在多块硬盘上
b) 存储相关的磁盘配置(HDFS数据存储):dfs.data.dir
参数说明:HDFS的数据存储目录,建议配置在多块硬盘上,可提高整体IO性能
例如:

<property>
  <name>dfs.name.dir</name>
  <value>/data1/hadoopdata/mapred/jt/,/data2/hadoopdata/mapred/jt/</value>
</property>

c) 存储相关的磁盘配置(HDFS元数据存储):dfs.name.dir

参数说明:HDFS的元数据存储目录,建议设置多目录,每个多目录都可保存元数据的一个备份
注:要想提升hadoop整体IO性能,对于hadoop中用到的所有文件目录,都需要评估它磁盘IO的负载,对于IO负载可能会高的目录,最好都配置到多个磁盘上,以提示IO性能

3.4. RPC Handler个数和Http线程数优化
a) RPC Handler个数(mapred.job.tracker.handler.count)
参数说明:JobTracker需要并发的处理来自各个TaskTracker的RPC请求,可根据集群规模和并发数来调整RPC Handler的个数。
默认值:10
推荐值:60-70,最少要是TaskTracker个数的4%
单位:无
b) Http线程数(tasktracker.http.threads)
在Shuffle阶段,Reduce Task会通过Http请求从各个TaskTracker上读取Map Task的结果,TaskTracker是使用Jetty Server来提供服务的,这里可适量调整Jetty Server的工作线程以提高它的并发处理能力。
默认值:40
推荐值:50-80+

3.5. 选择合适的压缩算法
mapred.compress.map.output / Mapred.output.compress
map输出的中间结果时需要进行压缩的,指定压缩方式(Mapred.compress.map.output.codec/ Mapred.output.compress.codec)。推荐使用LZO压缩。

3.6. 启用批量任务调度(现在新版本都默认支持了)
a) Fair Scheduler
mapred.fairscheduler.assignmultiple
b) Capacity Scheduler

3.7. 启用预读机制(Apache暂时没有)
Hadoop是顺序读,所以预读机制可以很明显的提高HDFS的读性能。
HDFS预读:
dfs.datanode.readahead :true
dfs.datanode.readahead.bytes :4MB
shuffle预读:
mapred.tasktracker.shuffle.fadvise : true
mapred.tasktracker.shuffle.readahead.bytes : 4MB

3.8.HDFS相关参数优化
1) dfs.replication
参数说明:hdfs文件副本数
默认值:3
推荐值:3-5(对于IO较为密集的场景可适量增大)
单位:无
2) dfs.blocksize
参数说明:
默认值:67108864(64MB)
推荐值:稍大型集群建议设为128MB(134217728)或256MB(268435456)
单位:无
3) dfs.datanode.handler.count
参数说明:DateNode上的服务线程数
默认值:10
推荐值:
单位:无
4) fs.trash.interval
参数说明:HDFS文件删除后会移动到垃圾箱,该参数时清理垃圾箱的时间
默认值:0
推荐值:1440(1day)
单位:无
5) io.sort.factor
参数说明:当一个map task执行完之后,本地磁盘上(mapred.local.dir)有若干个spill文件,map task最后做的一件事就是执行merge sort,把这些spill文件合成一个文件(partition)。执行merge sort的时候,每次同时打开多少个spill文件由该参数决定。打开的文件越多,不一定merge sort就越快,所以要根据数据情况适当的调整。
默认值:10
推荐值:
单位:无
6) mapred.child.java.opts
参数说明:JVM堆的最大可用内存
默认值:-Xmx200m
推荐值:-Xmx1G | -Xmx4G | -Xmx8G
单位:-Xmx8589934592也行,单位不固定
7) io.sort.mb
参数说明:Map Task的输出结果和元数据在内存中占的buffer总大小,当buffer达到一定阀值时,会启动一个后台进程来对buffer里的内容进行排序,然后写入本地磁盘,形成一个split小文件
默认值:100
推荐值:200 | 800
单位:兆
8) io.sort.spill.percent
参数说明:即io.sort.mb中所说的阀值
默认值:0.8
推荐值:0.8
单位:无
9) io.sort.record
参数说明:io.sort.mb中分类给元数据的空间占比
默认值:0.05
推荐值:0.05
单位:无
10) Mapred.reduce.parallel
参数说明:Reduce shuffle阶段copier线程数。默认是5,对于较大集群,可调整为16~25
默认值:5
推荐值:16~25
单位:无

4.系统实现角度调优
https://www.xiaohui.org/archives/944.html
主要针对HDFS进行优化,HDFS性能低下的两个原因:调度延迟和可移植性


4.1. 调度延迟
关于调度延迟主要是发生在两个阶段:
a) tasktracker上出现空余的slot到该tasktracker接收到新的task;
b) tasktracker获取到了新的Task后,到连接上了datanode,并且可以读写数据。
之所以说这两个阶段不够高效,因为一个分布式计算系统需要解决的是计算问题,如果把过多的时间花费在其它上,就显得很不合适,例如线程等待、高负荷的数据传输。
下面解释下会经历上边两个阶段发生的过程:
a) 当tasktracker上出现slot时,他会调用heartbeat方法向jobtracker发送心跳包(默认时间间隔是3秒,集群很大时可适量调整)来告知它,假设此时有准备需要执行的task,那么jobtracker会采用某种调度机制(调度机制很重要,是一个可以深度研究的东东)选择一个Task,然后通过调用heartbeat方法发送心跳包告知tasktracker。在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。
b) 这个过程中所发生的操作都是串行化的:tasktracker会连接到namenode上获取到自己需要的数据在datanode上的存储情况,然后再从datanode上读数据,在该过程中,HDFS一直处于等待状态,这就使得资源利用率不高。
若能减短hdfs的等待时间;在执行task之前就开始把数据读到将要执行该task的tasktracker上,减少数据传输时间,那么将会显得高效很多。未解决此类问题,有这样几种解决方案:重叠I/O和CPU阶段(pipelining),task预取(task prefetching),数据预取(data prefetching)等。

4.2. 可移植性
Hadoop是Java写的,所以可移植性相对较高。由于它屏蔽了底层文件系统,所以无法使用底层api来优化数据的读写。在活跃度较高的集群里(例如共享集群),大量并发读写会增加磁盘的随机寻道时间,这会降低读写效率;在大并发写的场景下,还会增加大量的磁盘碎片,这样将会大大的增加了读数据的成本,hdfs更适合文件顺序读取。
对于上述问题,可以尝试使用下面的解决方案:
tasktracker现在的线程模型是:one thread per client,即每个client连接都是由一个线程处理的(包括接受请求、处理请求,返回结果)。那么这一块一个拆分成两个部分来做,一组线程来处理和client的通信(Client Threads),一组用于数据的读写(Disk Threads)。
想要解决上述两个问题,暂时没有十全十美的办法,只能尽可能的权衡保证调度延迟相对较低+可移植性相对较高。


4.3. 优化策略:Prefetching与preshuffling
a) Prefetching包括Block-intra prefetching和Block-inter prefetching:
Block-intra prefetching:对block内部数据处理方式进行了优化,即一边进行计算,一边预读将要用到的数据。这种方式需要解决两个难题:一个是计算和预取同步,另一个是确定合适的预取率。前者可以使用进度条(processing bar)的概念,进度条主要是记录计算数据和预读数据的进度,当同步被打破时发出同步失效的通知。后者是要根据实际情况来设定,可采用重复试验的方法来确定。
Block-inter prefetching:在block层面上预读数据,在某个Task正在处理数据块A1的时候,预测器能预测接下来将要读取的数据块A2、A3、A4,然后把数据块A2、A3、A4预读到Task所在的rack上。

b) preshuffling
数据被map task处理之前,由预测器判断每条记录将要被哪个reduce task处理,将这些数据交给靠近reduce task的map task来处理。

参考资料:
cloudera官方文档
http://blog.cloudera.com/blog/2009/12/7-tips-for-improving-mapreduce-performance/
AMD白皮书(较为实用)
http://www.admin-magazine.com/HPC/content/download/9408/73372/file/Hadoop_Tuning_Guide-Version5.pdf

国内博客(大部分内容都是AMD白皮书上的翻译):
http://dongxicheng.org/mapreduce/hadoop-optimization-0/
http://dongxicheng.org/mapreduce/hadoop-optimization-1/

原文地址:https://www.cnblogs.com/leocook/p/hadoop_optimize01.html