初识MapReduce

What is MapReduce?


  MapReduce是一个流行的针对数据密集任务的分布式计算模型,它正在发展成为用来支撑包括Web索引、数据挖掘和科学仿真等领域的大规模数据并行应用的重要编程模型。

  Hadoop是Google公司的MapReduce编程模型的最受欢迎的Java开源实现。在很多公司,Hadoop已经用于大规模数据分析任务,并且经常用在对响应时间要求很严格的作业中。

MapReduce Model


  MapReduce编程模型的设计目标是,使用普通硬件的大型集群处理非结构化数据并产生大规模数据集。它能够在数千个计算节点的集群上处理TB级的数据,进行故障处理,完成任务复制并聚合最终结果。MapReduce模型简单易懂。它是由Google研究院的工程师在21世纪初设计的(http://research.google.com/archive/mapreduce.html)。这一模型包含两个可以在多台机器上并行执行的函数:一个map函数和一个reduce函数。

  要使用MapReduce,程序员需要编写一个用户定义的map函数和一个(同样是用户定义的)reduce函数来表示期望的计算逻辑。map函数读取键值对,执行用户指定的代码,产生中间结果。然后,通过reduce函数的用户指定代码聚合中间结果并输出最终结果。

  MapReduce应用程序的输入按照每个输入规约加入到记录中,每个输入规约产生多个键值对,每个键值对以<k1,v1>形式表述。因此,MapReduce处理过程包含以下两个主要环节。

  map():对所有输入记录逐条执行用户定义的map函数,每条记录产生零到多个中间键值对,也就是<k2,v2>记录。然后所有的<k2,v2>记录都放到<k2,list(v2)>记录中。

  reduce():按照键的不同,对每个map输出的<k2,list(v2)>记录调用一次用户定义的reduce函数;对于每条记录,reduce函数输出零到多个<k2,v3>对。所有的<k2,v3>对最后合并为最终结果。

map和reduce的函数签名如下:

map(<k1,v1>)                  list(<k2,v2>)

reduce(<k2,list(v2)>)  <k2,v3>

  MapReduce编程模型的设计独立于存储系统。MapReduce通过reader从底层存储系统读取键值对。reader从存储系统读取所有记录,并封装成键值对供后续处理。用户可以通过实现相应的reader增加对新存储系统的支持。这种存储独立的设计使MapReduce能够分析保存在不同存储系统中的数据,为异构系统带来了极大便利。

  为了理解MapReduce编程模型,例如:需要从给定的输入文件中获得每一个单词出现的次数。将其转化成为MapReduce作业,单词计数作业通过以下几个步骤定义。

1.输入数据拆分成记录。

2.map函数处理上述记录,并对每个单词生成键值对。

3.合并map函数输出的所有键值对,并根据键分组、排序。

4.将中间结果发送给reduce函数,由reduce函数产生最终输出。

完整过程如下:

  进行键值对的聚合操作时,会产生的大量I/O以及网络流量I/O。为了压缩map和reduce步骤间需要的I/O网络流量,程序员可以选择在map一侧进行预聚合,而预聚合通过提供Combiner函数完成。Combiner函数与reduce函数类似,其不同之处在于,前者并不传递给定键的所有值,而是把传递进来的输入值之和作为输出值传递出去。

MapReduce工作原理

  经过一个或者多个步骤,MapReduce编程模型可以用来处理许多大规模数据问题,还可以更高效地实现MapReduce编程模型来支持使用大量机器处理大量数据的问题。在大数据的背景下,可以处理的数据规模可以大到无法在单机存储。

  在典型的HadoopMapReduce框架下,数据拆分为数据块并分发到集群内的多个节点上。MapReduce框架通过把计算逻辑转移到数据所在的机器,而不是把数据转移到其能够得以处理的机器上,从而充分利用数据的本地性优势。MapReduce应用的大多数输入数据块存放在本地节点,因而能够迅速加载,而且可以并行读取在多个节点上的多个数据块。因此,MapReduce能够达到很高的综合I/O带宽和数据处理速率。

  要启动一项MapReduce作业,Hadoop会创建一个MapReduce应用的实例并把作业提交给JobTracker。然后,作业被拆分为map任务(也叫作mapper)和reduce任务(也叫作reducer)。

  Hadoop启动MapReduce作业时,输入数据集拆分为大小相等的数据块,并采用心跳协议(heartbeatprotocol)分配任务。每个数据块被调度到一个TaskTracker节点上并由一个map任务处理。

  每项任务都在一个工作节点的可用插槽上执行,每个节点会配置固定数量的map插槽和固定数量的reduce插槽。如果所有可用插槽都被占用,挂起的任务就需要等待一些插槽被释放。

  TaskTracker周期性地向JobTracker发送其状态。当TaskTracker节点空闲时,JobTracker节点会给它分配新任务。JobTracker节点在散发数据块时会考虑数据本地性,总是试图把本地数据块分配给TaskTracker节点。如果尝试失败,JobTracker节点会分配一个本地机架或者随机数据块给TaskTracker节点。

  当所有的map函数都执行结束时,运行时系统会将中间键值对分组,并发起一组reduce任务来生成最终结果。接下来执行过程从shuffle阶段转向reduce阶段。在这个最终的reduce阶段,reduce函数被调用来处理中间数据并写最终输出。用户经常使用不同的术语来表述Hadoop的map和reduce任务、子任务、阶段、子阶段。

  map任务包含了两个子任务(map和merge),而reduce任务仅包含一个任务。然而,shuffle和sort首先发生,并由系统完成。每个子任务又拆分为多个子阶段,如readmap、spill、merge、copymap和reducewrite。

影响MapReduce性能的因素

  影响MapReduce输入数据处理时间的因素很多。其中之一是实现map和reduce函数时使用的算法。其他外部因素也可能影响MapReduce性能。根据我们的经验和观察,可能影响MapReduce的主要因素有以下几个。

  硬件(或者资源)因素,如CPU时钟、磁盘I/O、网络带宽和内存大小。

  底层存储系统。输入数据、分拣(shuffle)数据以及输出数据的大小,这与作业的运行时间紧密相关。

  作业算法(或者程序),如map、reduce、partition、combine和compress。有些算法很难在MapReduce中概念化,或者在MapReduce中效率可能会降低。

  运行map任务时,shuffle子任务的中间输出存储在内存缓冲区中,用以减少磁盘I/O。输出的大小可能会超过内存缓冲区而造成溢出,因此需要spill子阶段把数据刷新到本地文件系统。这个子阶段也会影响MapReduce性能,它经常采用多线程技术实现,以便使磁盘I/O利用率最大化并缩减作业的运行时间。

  MapReduce编程模型允许用户使用自己的map和reduce函数指定数据转换逻辑。本模型并不限定map函数产生的中间对在交由reduce函数处理前如何被分组。因此,归并排序(mergesort)算法被用作默认的分类算法。然而,归并排序算法并非总是最高效的,尤其是对分析型任务(如聚合和等值连接)而言,这类任务并不关心中间键的顺序。

  对于MapReduce编程模型来说,分组(grouping)/划分(partitioning)是一个串行的任务。这就意味着在reduce任务可以运行之前,框架需要等待所有map任务完成。

  MapReduce性能是以map和reduce的运行时间为基础的。这是因为典型环境下集群节点数目和节点插槽数目这类参数是不可修改的。其他可能对MapReduce性能构成潜在影响的因素具体如下。

  I/O模式:也就是从存储系统获取数据的方式。从底层存储系统读取数据有以下两种模式。

  直接I/O:通过硬件控制器把数据从本地硬盘缓存中直接读到内存,因而不需要进程间通信成本。

  流式I/O:通过特定进程间通信手段,如TCP/IP和JDBC,从其他正在运行进程(典型情况是存储系统进程)读取数据。

  从提高性能的角度看,使用直接I/O比流式I/O更高效。

参考资料:Hadoop MapReduce性能优化

原文地址:https://www.cnblogs.com/muzhongjiang/p/3940845.html