深入Hadoop

## 1、HDFS原理

![1542796586771](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542796586771.png)

### 1)三大组件

NameNode、DataNode、SecondaryNameNode

### 2)NameNode

#### a.作用

在储元数据(文件名、创建时间、大小、权限、文件与block块映射关系)

#### b.数据存储目录

数据存储在hdfs-site.xml的dfs.namenode.name.dir属性配置中

1.fsimage:镜像文件,存储某时段内存元数据信息和文件与Block块的映射关系(NameNode第一映射关系
​ 2.edits:编辑日志文件
​ 3.seen_txid:操作事务id
​ 4.VERSION:存储命名空间ID、集群ID等信息

#### c.多次格式化namenode的问题原因解释

hdfs格式化会改变VERSION文件中的clusterID,首次格式化时datanode和namertode会产生相同的clusterID;如果重新执行格式化,namenode的clusterID改变,就会与datanode的clusterID不一致,如果重新启动或读写hdfs就会挂掉

### 3)DataNode

#### a.作用

存储真实的数据信息

#### b.数据存储目录

dfs.datanode.data.dir 存储内容:数据本身和数据的长度、校验和和时间戳。

#### c.文件块(Block):

基本的存储单元,默认大小是128M,通过dfs.blocksize属性配置

#### d.副本策略

<1>默认是3个副本,通过dfs.replication配置
​ <2>在放形式:
​ I.如果客户端在集群中,第一个副本放到客户端机器上:否则第一个副本随机挑选一个不忙的机器

​ II.第二个副本放到和第一个副本不同的机架上的一个服务器上​

​ III.第三个副本放到和第二个副本相同机架不同服务器上

​ IV.如果还有更多副本,就随机存放

#### e.DataNode与NameNode通信

<1>DataNode启动后向NameNode进行注册,注册完后周期性(1小时)向NN上传块报告(BlockReport)

BlockReport数据内容:Block块与DN的映射关系(第二映射关系)

作用:DN通过上传块报告,能更新NN内存中的映射关系
<2>DN发送心跳(3s)给NN,心跳返回结果带有的NN下发给DN超过10m,就认为DN不可用了

### 4)secondaryNameNode

#### a.作用

减轻NameNode压力,将edits编辑日志文件和fsimage镜像文件进行合并

#### b.执行流程

<1>SNN周期性地向NN发送请求,NN生成一个新的edits文件

<2>NN将edits文件和fsimage文件发给SNN

<3>SNN将fsimage文件加载到内存,合并edits文件,生成新的fsimage.ckpt文件

<4>SNN将新的fsimage.ckpt文件发给NN

<5>NN用新的fsimage.ckpt代替旧的fsimage文件,重命名edits.new为edits文件

### 5)HDFS读写流程

#### a.写入流程

![1542856142022](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542856142022.png)

<1>Client客户端向远程NN发送RPC请求

<2>NN检查文件是否存在,创建者是否有权限,成功后创建一个文件,失败抛异常

<3>Client端将文件切分成多个切片,并以队列方式管理这些切片,并向NN申请新的blocks,获取用来存储切片的DN列表
​ <4>Client端将切片以管道(pipeline)的方式写到DN的block块上,DN根据副本策略将切片传递给其他DN

<5>DN切片写入成功后,给Client端一个ack包。成功后继续写剩下的切片;不成功会重试直到失败,将失败的DN从pipeline中删除

#### b.读取流程

![1542856213679](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542856213679.png)

<1>Client客户端向远程NN发送RPC请求

<2>NN查找文件对应的block块及存放的DN地址,返回给Client端

<3>Client端以pipeline的方式从DN中读取各个block块数据

<4>Client端读取block块后,使用校验和验证,判断block是否损坏。如果损坏,读取另外DN上的数据;如果没有损坏,通知NN,继续下一个block块读取

### 6)安全模式

#### a.含义

客户端只能进行查看,不能进行写入、删除等操作

#### b.作用

HDFS集群启动后会先进入安全模式,检查数据块和DN的完整性

#### c.判断条件

<1>副本数达到要求的block块数占所有block块总数的比例是否达到配置要求配置项

dfs.namenode.replication.min 最小block副本数,默认是1

dfs.namenode.safemode.threshold-pct 百分比,默认0.999f
​ <2>可用的DN数否达到配置要求
​ 配置项:dfs.namenode.safemode.min.datanodes 默认是0

<3>1,2条件满足后维持的时间是否达到配置要求
​ 配置项:dfs.namenode.safemode.extension默认是1ms

#### d.命令操作

bin/hdfs dfsadmin-safemode <command>
​ command选项:
​ get:查看当前状态
​ enter:进入安全模式
​ leave:强制离开安全模式
​ wait:等待

## 2、HDFS命令行及JavaAPI使用

### 1)HDFS命令行

#### a.bin/hdfs dfs命令

>-help【cmd】:显示命令的帮助信息
>​ -mkdir【-p】:创建目录,p可以创建不存在的父路径
>​ -Is(r):显示当前目录下的所有文件
>​ -put:本地文件上传到HDFS上
>​ -copyFromLocal:本地文件复制到hdfs
>
>-moveFromLocal:本地文件移动到hdfs
>​ -du(s):显示目录中所有文件大小;只指定一个文件时,显示此文件大小
>​ -count【-q】:显示当前目录下的所有文件大小

```shell
[hadoop@master hadoop-2.7.3]$ bin/hdfs dfs -mkdir /text1/1
mkdir: `/text1/1': No such file or directory
[hadoop@master hadoop-2.7.3]$ bin/hdfs dfs -mkdir -p /text1/1
[hadoop@master hadoop-2.7.3]$ bin/hdfs dfs -ls /text
[hadoop@master hadoop-2.7.3]$ bin/hdfs dfs -ls /text1
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2018-11-22 01:30 /text1/1
```

>-mv:移动文件/到目标目录
>​ -cp:复制文件/目录到目标目录
>​ -rm(r):删除文件/目录 -r递归删除
>​ -get[-ignoreCrc]:复制文件到本地,可以忽略crc校验
>​ -cat:在终端显示文件内容
>​ -text:在终端显示文件内容,将源文件输出为文本格式
>​ -copy ToLocal[-ignoreCrc]:复制文件到本地
>​ -moveToLocal:移动文件到本地

#### b.bin/hdfs dfsadmin命令

> -report:查看文件系统的基本信息和统计信息
> ​ -safemode enter I leave I get Iwait:安全模式命令
> ​ -refreshNodes:重新读取hosts和exclude文件,在新增节点或注销节点时用
> ​ -finalizeUpgrade:终结HDFS的升级操作
> ​ -setQuota<quota><dirname>:为每个目录<dirname>设定配额<quota>
> ​ -clrQuota<dirname>:为每个目录<dirname>清除配额设定

#### c.HDFS java API

API文档
​ http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.9.0/api/index.html
​ 访问入口:FileSystem

> 创建目录:mkdirs
> ​ 上传文件:create/put或copyFromLocalFile
> ​ 列出目录的内容:listStatus
> ​ 显示文件或目录的元数据:getFileStatus
> ​ 下载文件:open/geta或copyToLocalFile
> ​ 删除文件或目录:delete

a.Hadoop默认支持权限控制,可将其关闭

​ hdfs-site.xml文件:dfs.permissions.enables设置成false

注:需要配置到服务器hdfs-site.xml中,服务器重启

b.获取文件元数据信息

​ 副本策略:dfs:replication配置项在客户端进行指定

c.下载文件的时候调用copyTOLocalFile的问题

​ 由于本地是Windows系统,没有安装hadoop环境,所以使用第四个参数指定使用本地文件系统

​ filesystem.sopyToLocalFile(false,new Path(srcPathName),new Path(dstPathName),true);

## 3、YARN资源调度与隔离

#### 1).资源调度(resouce scheduler)

<1>FIFO Scheduler:把应用按提交的顺序排成一个先进先出队列

![1542960252074](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542960252074.png)

<2>Capacity Scheduler(雅虎):预先划分为多个队列。每个队列按FIFO(默认)或DRF方式分配资源

Apache版本默认使用

![1542960281295](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542960281295.png)

<3>Fair Scheduler(FaceBook):动态划分世可预先划分队列。每个队列按Fair(默认)或FIFO或DRF(主资源公平算法)方式分配分配资源

CDH版本默认使用

![1542960309281](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1542960309281.png)

注:DFS算法(主资源公平算法)

​ 作业1:CPU资源是主资源

​ 作业2:内存资源是主资源

##### <4>capacity Scheduler配置(capacity-scheduler.xml)

​ ①配置capacity-site.xml

​ yarn.scheduler.capacity.root.queues:prood,dev

​ yarn.scheduler.capacity.root.dev.queues:eng,sciences

​ yarn.scheduler.capacity.root.prod.capacity:40

​ yarn.scheduler.capacity.root.dev.capacity:60

​ yarn.scheduler.capacity.root.dev.maximun-capacity:75

​ yarn.scheduler.capacity.root.dev.eng.capacity:50

​ yarn.scheduler.capacity.root.dev.science.capacity:50

​ 1.dev队列分成eng和science子队列
​ 2.maxmum-capacity属性指定最大容量,不会占用其他资源
​ 3.capacity容器还可以配置最大资源数、同时运行多少应用、队列的ACL认证
​ 4.指定应用队列名称:mapreduce.job.queuename

​ ②配置yarn-site.xml

​ yarn.resourcemanager.scheduler.class

​ org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler

##### <5>运行capacity Scheduler

a.指定作业运行在哪个队列上MapReduce.job.queuename

bin/hadoop jar /share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount -Dmapreduce.jab.queuename=eng file:/NOTICE.txt file:/data

b.查看调度器

http://master:18088中的scheduler

##### <6>Fair Scheduler配置

a.去掉yarn-site.xml中的yarn.resourcemanager.scheduler.class,保持默认

b.直接运行作业的话,就创建一个以当前登陆用户名为队列名的队列运行

如果运行作业时指定了队列名,就在指定的队列中运行

c.fair-scheduler.xml

​ <allocations>

​ <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>

​ <queue name="prod">

​ <weight>40</weight>

​ <shedulingPolicy>fifo</shedulingPolicy>

​ </queue>

​ <queue name="prod">

​ <weight>60</weight>

​ <queue name="eng"/>

​ <queue name="science"/>

​ </queue>

​ <queuePlacementPolicy>

​ <rule name="specified" create="false">

​ <rule name="primaryGroup" create="false">

​ <rule name="default" queue="dev.eng">

​ </queuePlacementPolicy>

​ </allocations>

​ 1.每个队列中都有weight权重属性,作为公平调度的依据。若是动态划分,权重都是1

​ 2.defaultQueueSchedulingPolicy属性配置队列内调度方式

​ 3.每个队列中的schedulingPolicy属性配置该队列的调度方式,优先级高

​ 4.Fair调度器会基于queuePlacementPolicy规则确定应用放到哪个队列

​ specified:应用指定队列名

​ primaryGroup:队列名为用户组的队列

​ user:队列名为用户名的队列

​ default:指定队列

​ 默认:specified、user、default

##### <7>运行Fair Scheduler

a.指定作业运行在哪个队列上MapReduce.job.queuename或不指定

bin/hadoop jar /share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount -Dmapreduce.jab.queuename=eng file:/NOTICE.txt file:/data

b.查看调度器

http://master:18088中的scheduler

### 2)资源隔离(NodeManager)

#### a.含义

NodeManager为运行的不同的Container进程提供可独立使用的计算资源,以避免它们之间相互干扰。

#### b.隔离方式

CPU隔离、内存隔离

#### c.YARN Container两种实现方式

DefaultContainerExecutor和LinuxContainerExecutor

其中DefaultContainerExecutor不支持CPU的资源隔离,LinuxContainerExecutor使用Cgroup的方式支持CPU的资源隔离,两者内存的资源隔离都是通过“线程监控”的方式实现的。

注:内存使用量超过预先定义的上限值的情况,使用Cgroup进行内存资源隔离会强制杀死进程。

#### d.内存隔离

MonitoringThread线程每隔一段时间扫描正在运行的Container进程应用程序配置参数
mapreduce.map.memory.mb:MapReduce Map Task需要使用的内存量(单位:MB)
NodeManager配置参数
yarn.nodemanager.pmem-check-enabled:NodeManager是否启用物理内存最监控,默认值:trueyarn.nodemanager.vmem-check-enabled:NodeManager是否启用虚拟内存量监控,默认值:true yarn.nodemanager.vmem-pmem-ratio:NodeManager虚拟内存与物理内存的比例,默认值2.1
yarn.nodemanager.resource.memory-mb:NodeManager最多可以使用多少物理内存,默认8G

#### e.cpu隔离(cgroup)

**<1>cgroup以组为单位隔离资源,同一个组可以使用的资源相同**

在cpu目录创建分组,yarn默认使用hadoop-yarn组作为最上层,任务运行时yarn会为每个container在hadoop-yarn里面创建一个组

yarn主要使用cpu.cfs_quota_us cpu.cfs_period_us cpu.shares3个文件

**<2>YARN使用cgroup的两种方式来控制cpu资源分配**

严格按核数隔离资源:可使用核数=cpu.cfs_quota_us/cpu.cfs period_us,根据任务申请的core数计算cpu.cfs_period_us

按比例隔离资源:按每个分组里面cpu.shares的比率来分配cpu,比如ABC三个分组,cpu.shares分别设置为102410242048,那么他们可以使用的cpu比率为1:1:2

注:创建完分组后只需要将要限制的进程的id写入tasks文件即可;若需要解除限制,在tasks文件删除即可

##### <3>cpu隔离配置

可参考博客:https://www.jianshu.com/p/e283ab7e2530

**container-executor.cfg文件**

​ yarn.nodemanager.1inux-container-executor.group=root
​ banned.users=root
​ min.user.id=1000
​ allowed.system.users=root
​ **权限设置**

​ chown root:root bin/container-executor
​ chmod 6050 bin/container-executor
​ **检测是否配置成功**
​ bin/container-executor--checksetupMapReduce原理与实践

## 4、MapReduce原理与实践

### 1)执行流程

![1543496596952](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543496596952.png)

![1543566102082](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543566102082.png)

### 2)案例运行方式

#### a.单机运行

<1>导入Windows支持的两个文件:winutils.exe和hadoop.dll

<2>配置HADOOP_HOME环境变量(需要重启计算机)

​ 临时配置环境变量:System.setProperty("hadoop.home.dir","%{HADOOP_HOME}%")

<3>修改NationIO类,将access0调用处直接改成true

#### b.远程调用运行

含义:Windows系统的代码直接连接Linux系统的hadoop环境进行运行,运行结果可以存到本地或HDFS服务器上

#### c.打Jar包放到hadoop集群上运行

##### <1>两种方式打jar包

​ ①eclipse打jar

![1543647879158](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543647879158.png)

​ ![1543647912799](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543647912799.png)

![1543648000718](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543648000718.png)

②maven打jar包

​ ![1543647834987](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543647834987.png)

![1543648141649](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543648141649.png)

##### <2>放到Linux环境中运行

通过rz命令将jar包导入到Linux中,

![1543648307496](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543648307496.png)

![1543648628191](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543648628191.png)

#### d.Inputformat

##### <1>功能

对输入文件进行拆分,形成多个InputSplit文件,每个InputSplit对应一个map

创建RecorderReader,从InputSplit分片中读取数据供map使用

##### <2>实现类

DBInputFormat、FileInputFormat

FileInputFormat:TextInputFormat/KeyValueTextInputFormat/SequenceFileInputFormat/NLineInputFormat/CombinFileInputFormat

**FilelnputFormat:处理文件的InputFormat类**

​ 1.TextinputFormat:文件按行划分,key是这一行在文件中的偏移量,value是这一行文本

​ 2.SequenceFilelnputFormat:从sequenceFile读取,<k,v>键值对存放文件
​ 3.KeyValuelnputFormat读取普通文本文件,文件按照行分割,每一行由key和value组成,key和value的分隔符若没有指定,那么整行为key,value为空

​ 4.NLinelnputFormat是可以将N行数据划分为一个Split作为Map Task输入
​ 5.CombineFilelnputFormat合并多个小文件成为一个分片

**DBInputFormat:主要用于处理数据库数据的InputFormat类**

##### <3>SequenceFileInputFormat使用

①生成SequenceFile文件(<k,v>形式的二进制文件)

②map/reduce/驱动方法

​ jpb.setInputFormatClass(SequenceFileInputFormat.class);

#### e.输入切片(InputSplit)

##### <1>什么时候切分

client端进行切分,切分后交给YARN服务器运行

##### <2>切片中存储的内容

数据长度、数据存储位置

##### <3>切片大小

minSize = max{minSplitSize, mapred.min.splt.size}

maxSize = maperd.max.split.size

splitSize = max{minSize, min{maxSize, blockSize}}

##### <4>切片数量(mapper进程)

总文件大小/切片大小

#### f.Reduce个数

__设置__
​ 通过job.setNumReduceTasks(n)设定或配置mapreduce job.reduces
​ __建议__

合适的reduce task数量是0.95或者1.75*(nodes** mapreduce.tasktracker.reduce.tasks.maximum),其中,mapreduce.tasktracker.tasks.reduce.maximum的数量一般设置为各节点cpu core数量,即能同时计算的slot数量。对于0.95,当map结束时,所有的reduce能够立即启动;对于1.75,较快的节点结束第一轮reduce后,可以开始第二轮的reduce任务,从而提高负载均衡

#### g.OutputFormat

##### <1>功能

校验job中指定输出路径是否存在,将结果写入输出文件

##### <2>实验类

FileOutputFormat:将Reduce结果写入文件中

​ 1.TextFileOutputFormat:主要是处理普通文本的输出,也是默认实现

​ 2.SequenceFileOutputFormat:主要是处理SequenceFile的输出

​ 3.MapFileOutputFormat主要是处理MapFile(特殊的SequenceFile)的输出

​ 4.FilterOutputFormat:主要就是方便包装其他OutputFromat

​ 5.MultipleOutputs:将结果输出到多个文件中

DBOutputFormat:发送Reduce结果到SQL表中

#### h.partitioner分区器

##### <1>功能

①mapper和reduce处理逻辑之间,shuffle写入流程开始的时候

②将mapper输出结果分发到不同的reduce

##### <2>子类

①HashPartitioner(默认)

​ 将map的结果发送到相应的reduce

​ which reducer = ((key.hashCode)&Integer.MAX_VALUE)%numReduceTasks

②KeyFileBasedPartitioner

​ 基于Hash的partitioner

​ 他提供了多个人区间用于计算hash,当区间为0时KeyFileBasedPartitioner退化成HashPartitioner

③BinaryPartitioner

​ 继承Partitioner<BinaryComparable,V>

​ 计算which reducer时仅对键值K的[rightOffset,lefOffset]这个区间取hash,

​ Which reducer=(hash&Integer.MAX_VALUE)%numReduceTasks

④TotalOrderPartitioner

​ 这个类可以实现输出的全排序

​ 这个类并不是基于Hash的,对排序数据进行抽样,抽样数据进行排序生成标尺,将数据发送到对应区间ID的reduce

## 5、Shuffle流程

### 1)位置

在mapper和reduce处理逻辑只之间,连接map和reduce的纽带

### 2)功能

a.Shuffle的本义是洗牌、混洗,把一组有一定规律的数据尽量转换成一组无规律的数据,越随机越好。MapReduce中的Shuffle更像是洗牌过程的逆运转,把一组无规则的数据尽量转换成一组具有一定规则的数据。

b.从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill写过程,在Reduce端包括copy和sort读过程。

### 3)整体流程

![1543979979273](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543979979273.png)

### 4)Shuffle写流程

![1543980143886](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1543980143886.png)

Spill过程包括输出、排序、溢写、合并等过程

a.map输出数据经过分区,分区完后通过collect手机到内存环形缓冲区kvbuffer

b.sort将缓冲区中的数据排序

​ <1>按分区排序

​ <2>每个分区中的数据按key进行排序

c.spill线程溢写到本地磁盘

​ 每次缓冲区满就溢写,会产生很多小文件

d.merge合并将小文件合并成大文件

#### Collect

![1544008844728](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1544008844728.png)

在缓冲区中有数据和kvmeta索引。数据区域和索引数据区域在缓冲区中是相邻不重叠的两个区域,用一个分界点来划分。分界点每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长。
​ kvmeta索引是个四元组,包括:value起始位置、key起始位置、partition值、value长度。
​ 缓冲区满到一定程度,比如80%的时候就开始Spill。
​ 1.io.sort.mb:缓存map中间结果的buffer大小(MB),默认100

2.io.sort.record.percent:kvmeta索引占缓冲区大小的比例,默认0.05

3.io.sort.spill.percent:开始spil的阀值,默认0.8

注:数据区域或索引区域达到80%就spill

#### Sort

把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

#### Spill

Spill线程从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill<spill次数>.out”的文件
​ Spil线程根据排过序的K/meta挨个partition的把<key,value>数据吐到这个文件中,一个partition在文件中对应的数据也叫段(segment)
​ 一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。索引先放在内存中,内存不够再在磁盘上创建一个spill<spill次数>.out.index文件

![1544009256649](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1544009256649.png)

#### Merga

![1544009283719](C:UsersAdministratorAppDataRoamingTypora ypora-user-images1544009283719.png)

### 5)Shuffle读流程

##### copy

①Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。
​ ②如果内存可以放下就直接放到内存中,每个map数据对应一块空间。当内存空间达到一定程度就启动内存merge,将数据输出到一个磁盘文件中。
​ ③如果内存放不下就把map数据直接写到磁盘上。一个map数据就建一个文件,当文件数达到一定阈值,就启动磁盘merge,合并到一个文件。
​ ④最终对内存和磁盘上的数据进行全局合并。

##### merge sort

这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy-边sort,即copy和sort两个阶段是重叠而不是完全分开的。

<1>如果读取的数据在reduce内存中能放得下,就直接放到内存中。当内存空间到达一定阈值,就merge成一个磁盘文件。

<2>如果读取过来的数据内存放不下,就直接接输出到磁盘上。每个mapper过来的数据,就建一个文件。当文件到达伊宁阈值,就merge成一个打我文件。

原文地址:https://www.cnblogs.com/lyc0303/p/11662655.html