Hadoop大数据处理读书笔记

几个关键性的概念

  1. 云计算:是指利用大量计算节点构成的可动态调整的虚拟化计算资源。通过并行化和分布式计算技术,实现业务质量可控的大数据处理的计算技术。
  2. NameNode:是HDFS系统中的管理者。它负责管理文件系统的命名空间。维护文件系统的文件树以及全部的文件和文件夹的元数据。这些信息存储在NameNode维护的两个本地磁盘文件:命名空间镜像文件和编辑日志文件。

    同一时候,NameNode中还保存了每一个文件与数据块所在的DataNode的相应关系,这些信息被用于其它功能组件查找所需文件资源的数据server。

  3. SecondNameNode:与NameNode保持通信。依照一定时间间隔保存文件系统元数据的快照。

    当NameNode发生问题时,系统管理者能够通过手工配置的形式将保存的元数据快照恢复到又一次启动的NameNode中,减少数据丢失的风险。

  4. DataNode:是HDFS中保存数据的节点。HDFS中的数据通常被切割为多个数据块。以冗余备份的形式存储在多个DataNode中。

    DataNode定期向NameNode报告其存储的数据块列表,以备使用者通过直接訪问DataNode获得相应的数据。

  5. JobClient。

    JobClient是基于MapReduce接口库编写的client程序,负责提交MapReduce作业。

  6. JobTracker。是应用于MapReduce模块之间的控制协调者,它负责协调MapReduce作业的运行。

    当一个MapReduce作业提交到集群中。JobTracker负责确定兴许运行计划。包含须要处理哪些文件、分配任务的Map和Reduce运行节点、监控任务的运行、又一次分配失败的任务等。每一个Hadoop集群中仅仅有一个JobTracker。

  7. TaskTracker。TaskTracker负责运行由JobTracker分配的任务,每一个TaskTracker能够启动一个或多个Map或Reduce任务。同一时候,TaskTracker和JobTracker间通过心跳(HeartBeat)机制保持通信,以维持整个集群的运行状态。
  8. MapTask,ReduceTask。是由TaskTracker启动的负责详细运行Map任务和Reduce任务的程序。
MapReduce任务的运行过程

  1. MapReduce程序启动一个JobClient实例以开启整个MapReduce作业(Job)
  2. JobClient通过getNewJobId()接口向JobTracker发出请求。以获得一个新的作业ID。

  3. JobClient依据作业请求指定的输入文件计算数据块的划分,并将完毕作业须要的资源,包括JAR文件、配置文件、数据块,存放到HDFS中属于JobTracker的以作业ID命名的文件夹下,一些(如JAR文件)可能会以冗余备份的形式存放在多个节点上。
  4. 完毕上述准备后。JobClient通过调用JobTracker的submitJob()接口提交此作业。

  5. JobTracker将提交的作业放入一个作业队列中等待进行作业调度以完毕作业初始化工作。

    作业初始化主要是创建一个代表此作业的执行对象。作业执行对象中封装了作业包括的任务和任务执行状态记录信息用于兴许跟踪相关任务的状态和执行进度。

  6. JobTracker还须要从HDFS文件系统中取出JobClient放好的输入数据,并依据输入数据创建相应数量的Map任务,同一时候依据JobConf配置文件里定义的数量生成Reduce任务。
  7. 在TaskTracker和JobTracker间通过心跳机制维持通信,TaskTracker发送的心跳消息中包括了当前是否可执行新的任务的信息,依据这个信息。JobTracker将Map任务和Reduce任务分配到空暇的TaskTracker节点。
  8. 被分配了任务的TaskTracker从HDFS文件系统中取出所需的文件,包括JAR程序文件和任务相应的数据文件,从存入本地磁盘。并启动一个TaskRunner程序实例准备执行任务。

  9. TaskRunner在一个新的Java虚拟机中依据任务类型创建出MapTask或ReduceTask进行运算。

    在新的Java虚拟机中执行MapTask和ReduceTask的原因是避免这些任务的执行异常影响TaskTracker的正常执行。MapTask和ReduceTask会定时与TaskRunner进行通信报告进度,直到任务完毕。

基于云计算的大数据处理架构


基于云计算的大数据处理技术的应用
百度主要应用成果
HCE(Hadoop c++ Extension)——基于C++的MapReduce执行环境(攻克了Java对内存管理的低效)
HDFS2——分布式NameNode实现(攻克了NameNode压力大问题)
HDFS的透明压缩存储(解决HDFS系统中文件存储占用磁盘空间问题)
DISQL(Distributed SQL)——大数据分析语言
阿里巴巴
腾讯
华为
中国移动
三、MapReduce计算模式
MapReduce原理
Map:<k1,v1>→[<k2,v2>]
Reduce:<k2,[v2]>→[<k3,v3>]
注:<...>代表keyword/值数据对,[...]代表列表。
MapReduce处理过程图

MapReduce工作机制

  1. 作业提交:用户编写MapReduce程序创建新的JobClient实例(步骤1.1)。

    JobCl实例创建后,向JobTracker请求获得一个新的Jobld,用于标识本次MapReduce作业(步骤1.2)。然后JobClient检査本次作业指定的输入数据和输出文件夹是否正确。

    在检查无误后,JobClient将执行作业须要的相关资源。包括本次作业相关的配置文件、输入数据分片的数量。以及包括Mapper 和 Redllcer类的 JAR文件存入分布式文件存储系统中(步骤1.3),当中JAR文件将以多个备份的形式存放。完毕以上工作后。JobClient向JobTracker发出作业提交请求 (步骤1.4)。

  2. 作业初始化:作为系统主控节点。JobTracker会收到多个JobClient发出的作业请求,因此JobTracker实现了一个队列机制处理多个请求。

    收到的请求会放入一个内部队列。由作业调度器进行调度。JobTracker为作业进行初始化工作(步骤2.1)。

    初始化的内容是创建一个代表此作业的JoblnProgress实例,用 于兴许跟踪和调度此作业。JobTracker要从分布式文件存储系统中取出JobClient存放的输入数据分片信息(步骤2.2),以决定须要创建的Map任务数量,并创建相应的一批TaskInProgress实例用于监控和调度Map任务。

    而须要创建的Reduce任务数量和相应的TaskInProgress实例由配置文件里的參数决定。

  3. 任务分配:MapReduce框架中的任务分配机制是採用"拉"(pull)的机制实现的。 在任务分配之前,负责执行Map任务或Reduce任务的TaskTracker节点均已经启动。

    TaskTracker —直通过RPC向JobTracker发送心跳消息询问有没有任务可做(步骤3)。假设JobTracker的作业队列不为空。则TaskTracker发送的心跳消息将会获得JobTracker给它派发的任务。

    因为TaskTracker节点的计算能力(由内核数量和内存大小决定)是有限的, 因此每一个TaskTracker节点可执行Map任务和Reduce任务的数量也是有限的,即每一个 TaskTracker有两个固定数量的任务槽,分别相应Map任务和Reduce任务。在进行任务分 配时。JobTracker优先填满TaskTracker的Map任务槽,即仅仅要有空暇Map任务槽就分配 一个Map任务,Map任务槽满了后才分配Reduce任务。

  4.  Map任务运行:在MapTaskTracker节点收到JobTracker分配的Map任务后。 将运行一系列操作以运行此任务。首先,创建一个Tasklnprogress对象实例以调度和监控任务。

    然后将作业的JAR文件和作业的相关參数配置文件从分布式文件存储系统中取出。并拷贝到本地工作文件夹下 (JAR文件里的内容需经过解压)(步骤4.1)。 完毕这些准备工作后。TaskTracker新建一个TaskRunner实例来运行此Map任务(步骤4.2 )。 TaskRunner将启动一个单独的JVM, 并在当中启动MapTask运行用户指定的map函数(步骤4.3)。使用单独的JVM运行MapTask的原因是为了避免MapTask的异常影响 TaskTracker的正常运行。MapTask计算获得的数据,定期存入缓存中(步骤4.4), 并在缓存满的情况下存入本地磁盡中(步骤 4.5)。在任务运行时。MapTask定时与 TaskTracker通信报告任务进度(步骤4.6),直到任务全部完毕,此时全部的计算结果会存入本地磁盘中。

  5. Reduce任务运行:在部分Map任务运行完毕后。JobTracker即将依照上面第3步相同的机制開始分配Reduce任务到Reduce TaskTracker节点中(步骤5.1)。与Map任务启动过程类似。Reduce TaskTracker相同会生成在单独JVM中的ReduceTask以运行用户指定的Reduce函数(步骤5.2、步骤5.3)。同一时候ReduceTask会開始从相应的Map TaskTracker 节点中远程下载中间结果的数据文件(步骤5 4)。直到此时,Reduce任务还没有真正開始运行。而不过做好运行环境和数据的准备工作。唯独当全部Map任务运行完毕后, JobTracker才会通知所有Reduce TaskTracker节点開始Reduce任务的运行。相同, ReduceTask定时与TaskTracker通信报告任务进度。直到任务所有完毕(步骤5.6)。
  6. 作业完毕:在Reduce阶段运行过程中。每一个ReduceTask会将计算结果输出到分布式文件存储系统中的暂时文件(步骤5.5)。ReduceTask完毕时,这些暂时文件会合并为一个终于输出结果文件。JobTracker在收到作业包括的所有任务的完毕通知后(通过每一个TaskTraeker与JobTracker间的心眺消息)。会将此作业的状态设置为"完毕"。当此后的JobCient的第一个状态轮询请求到达时,将会获知此作业已经完毕(步骤6.1),于是JobClient会通知用户程序整个作业完毕并显示必要的信息(步骤6.2)。

作业调度:
  1. 先进先出调度器:默认的调度器。是支持优先级的先进先出调度器。不支持资源抢占。
  2. 公平调度器。

    公平调度器的设计目标是支持系统的全部用户能够公平的共享集群的计算能力。

  3. 能力调度器。採用多队列的形式组织集群中的计算资源,这些队列能够採用层次结构连接在一起。

  4. 自己定义的调度器
异常处理:
  • 任务异常(两种状态:失败和终止。任务出现异常后,TaskTracker会将此任务的失败信息报告给JobTracker。JobTracker会分配新的节点运行此任务。

失败:
  1. 用户编写的map或redeuce函数中的代码不对
  2. 任务所在的JVM出现执行异常,这通常也是某种代码异常导致的。
  3. 任务进度更新超时。

终止:
  1. 备份任务时,MapReduce框架为了避免某个未失败但运行缓慢的任务影响整个作业的运行速度而设计了备份任务机制。

  2. 当TaskTracker出现异常无法运行时。

  3. 用户通过命令行或Web页面手工终止或取消运行任务时。

  • TaskTracker异常(假设JobTracker超过最大时间间隔没有收到TaskTracker的心跳消息时,则觉得TaskTracker出现了异常。已完毕的任务会正常返回,未完毕的任务则又一次分配TaskTracker节点又一次运行。

    为了避免TaskTracker异常重复出现,MapReduce框架设定了黑名单机制。

  • JobTracker异常(眼下还没有能应对JobTracker异常的机制)
MapReduce应用开发流程

多个MapReduce过程的组合模式
  • 作业链(按顺序运行)——JobClient.runJob
JobClient.runJob(config1);
JobClient.runJob(config2);
  • 作业图(DAG)——jobControl
jobMapReduce3.addDependingJob(jobMapReduce1);
jobMapReduce3.addDependingJob(jobMapReduce2);
jobControl.addJob(jobMapReduce1);
jobControl.addJob(jobMapReduce2);
jobControl.addJob(jobMapReduce3);
jobControl.run();
  • Map/Reduce链——ChainMaper和ChainReducer
  1. 计数(Counting)——计算每条记录某个属性的函数表达式值,比如求和、平均值等。
  2. 分类(Classfication)——在给定一计算函数的情况下,将通过此计算函数计算后饿到结果值同样的实体放在一起(或进行兴许处理)
  3. 过滤处理(Filtering)——将符合某个条件的记录取出,或者进行格式转换。
  4. 排序(Sorting)——将记录依照一定规则排序后输出。
  5. 去重计数(Distinct Counting)——在计算某几个属性组合后去掉同样的反复组合后,求当中某一属性的统计值
  6. 相关计数(Cross-Correlation)——计算数组中记录以一定条件要求成对出现的次数或概率。

MapReduce算法实践
  1. 最短路径算法(Dijkstra算法)
  2. 反向索引算法
  3. PageRank算法
MapReduce性能调优


map函数在运行时。输出数据首先是保存在缓存中,这个缓存的默认大小是100MB,由參数io.sort.mb来控制。当缓存使用量达到一定比例时,缓存中的数据将写入磁盘中,这个比例是由參数io.sort.spill.percent控制。缓存中的数据每次输出到磁盘时会生成一个暂时文件,多个暂时文件合并后生成一个map输出文件,參数io.sort.factor制定最多能够有多少个暂时文件被合并到输出文件里。性能调优參数:
參数 类型 默认值 说明
Map阶段      
io.sort.mb int 100 map输出的缓存大小。单位为MB
io.sort.spill.percent float 0.8 map输出缓存占用超过此比例将開始写入磁盘
io.sort.factor int 10 合并多个暂时输出文件的数量。可增大
min.num.spills.for.combine int 3 输出暂时文件达到此数量时会运行一次combine操作
tasktracker.http.threads int 40 tasktracker可用于输出map文件的http线程数
Reduce阶段      
mapred.reduce.parallel.copies int 5 可读取多个map输出的线程数
mapred.reduce.copy.backoff int 300 reduce读取map输出的失败超时时间,s为单位
io.sort.factor int 10 处理之前合并输入文件的最大数量
mapred.job.shuffle.input.buffer.percent float 0.7 存储map输出数据的缓存占整个内存的比例
mapred.job.shuffle.merge.percent float 0.6 存储map输出数据的缓存的占用比例阀值。超过则存入磁盘
mapred.inmem.merge.threshold int 1000 当map输出文件超过此数量时,进行合并并存入磁盘
mapred.job.reduce.input.buffer.percent float 0.0 在reduce节点的内存中保持map输出数据的缓冲占整个内存的百分比,增大能够降低磁盘读写
mapred.child.java.opts int 200 map或reduce任务可使用的内存大小,默觉得200MB,可适当增大
io.file.buffer.size int 4096 进行磁盘I/O操作的是缓存大小,默认4kb,可提高为64kb或128kb

使用Combiner降低传输数据
启用数据压缩(DefaultCodec:zlib。GzipCodec:Gzip;BZip2Codec:bzip2)
參数 类型 默认值 说明
mapred.compress.map.output Boolean false 是否启用map输出压缩
mapred.map.output.compression.codec Class name org.apache.hadoop.io.compress.DefaultCodec map输出压缩类
使用预測运行功能:在任务运行过程中,hadoop会检測全部任务的进度和完毕情况,当出现某个任务运行进程远慢于整个系统平均进度时,hadoop会将在还有一个节点上启动一个同样的备份任务,并与原始任务并行运行。

当原始任务和备份任务的当中一个完毕时,还有一个任务被终止。

參数 类型 默认值 说明
mapred.map.tasks.speculative.execution Boolean true 是否启用map任务的预測运行机制
mapred.reduce.tasks.speculative.execution Boolean true 是否启用reduce任务的预測运行机制

重用JVM:默认设置。每一个JVM仅仅能够单独执行一个Task进程,其主要目的是避免某个任务的崩溃影响其它任务或整个TaskTracker的正常执行。

但MapReduce框架也能够同意一个JVM执行多个任务。设置參数mapred.job.reuse.jvm.num.tasks。或调用JobConf类的setNumTasksToExecutePerJvm接口。当设置为-1时,任务数量将没有限制。对于一些map函数初始化简单却执行频繁的作业,能够考虑。

分布式文件系统应满足:

  1. 使用大量低成本构建的分布式执行环境
  2. 可以应对大量并发用户訪问
  3. 可以处理超乎平常的文件大小
  4. 提供足够大的系统吞吐量

数据读取过程


  1. client生成一个HDFS类库中的DistributedFileSystem对象实例,并使用此实例的 open()接口打开一个文件。 
  2. DistributedFileSystem通过RPC向NameNode发出请求,以获得文件相关的数据块位置信息。NameNode将包括此文件相关数据块所在的 DataNode 地址。经过与 Client 相关的距离(參见4.4.2节性能优化)进行排序后,返回给DistributedFilesystem。
  3. DistributedFilesystem在获得数据块相关信息后。生成一个FSDatalnputStream对象实例返回给client。 此实例封装了一个DFSInputStream对象,负责存储数据块信息及 DataNode地址信息,并负责兴许的文件内容读取处理。 
  4. Client向FSDataInputStream发出读取数据的read()调用。

     

  5. 在收到read()调用后。FSDatalnputStream封装的DFSInputstream选择第一个数据块的近期的DataNode, 并读取对应的数据信息,返回给client。在数据块读取完毕后, DFSInputStream负责关闭到对应DataNode的链接。
  6. DFSInputStream将持续选择兴许数据块的近期DataNode节点。并读取数据返回给client。 直到最后一个数据块读取完毕。
  7. 当client读取全然部数据后,将调用FSDatalnputstream的close()接口结束本次文件读取操作。
  • 当读取某个DataNode出现问题时。DFSInputStream将选取下一个包括此数据块的近期的DataNode。
数据写入过程
  1. client生成一个HDFS类库中的DistributedFileSystem对象实例,并使用此实例的 create()接口打开创建一个文件。 
  2. DistributedFileSystem通过RPC向NameNode发出创建文件请求,NameNode在确认此文件没有重名文件。且Client有写入权限后。在命名空间中创建此文件的相应记录。

    在此过程中假设出现异常。NameNode将返回IOException。

  3. DistHbutedFilesystem在获得NameNode的成功返回后,生成一个FSDataoutputStream对象实例返回给client。

    此实例封装了一个DFSOutputstream对象。负责兴许的文件内容写入处理。 

  4. Client向FSDataInputStream发出写入数据的write()调用及须要写入文件的数据。

    DFSOutputStream在收到数据后会将数据拆分后放入一个数据队列。

     

  5. Datastreamer负责从数据队列中不断取出数据。准备写入DataNode中。但在写入 之前。DataStreamer须要从NameNode请求分配一些存放数据的数据块信息以及适合存放这些数据块的DataNode地址。
  6. 对于每一个数据块,NameNode会分配若干个DataNode以复制存储数据块,比如要将数据块2存入3个DataNode节点。

    Datastreamer会将数据块写入第一个DataNode。 这个DataNode会将数据传给第二个(步骤6.1),第二个传给第三个(步骤6.2),以完毕整 个DataNode链的数据写入。

  7. 每一个DataNode完毕写入后。会向Datastreamer报告已完毕(步骤7、步骤7.1和 步骤7.2)。同一时候向NameNode报告自己完毕了一个数据块的写人(步骤7.3)。

    步骤6和步骤7会循环运行。直到全部数据块写入完毕.

  8. 当Client完毕全部数据写入后。将调用FSDataInputStream的close()接口结束本次文件写入操作。
  • 当某个DataNode出现问题写入失败。那么故障节点将从DataNode链中删除,NameNode会分配还有一个DataNode完毕此数据块的写入,仅仅要有一个写入成功,本次操作也被视为完毕。
基于命令行的文件管理
  • hadoop fs -cmd<args>
通过API操作文件
  1. JAVA
  2. C/C++
  3. HTTP

HDFS性能优化

  1. 调整数据块尺寸
  2. 规划网络与节点
  3. 调整服务队列数量
  4. 预留磁盘空间
  5. 存储平衡(start-balancer.sh)
  6. 依据节点功能优化磁盘配置
  7. 其它參数
HDFS小文件存储问题

(未完待续)





原文地址:https://www.cnblogs.com/tlnshuju/p/7266721.html