hadoop1

hadoop-01

  • 大数据:指无法在一定时间范围内用常规软件工具进行捕捉,管理和处理的数据集合,是需要新的处理模式化才能具有更强决策力,洞察发现力和流程化能力的海量,高增长率和多样化的信息资产。

  • 分为:

    HDFS:海量数据的存储
    mapreduce:分布式运算框架
    YARN:资源调度平台和监控平台
    Commons:技术支持(大量工具类)
    

1.序列化

  • 将对象转换成二进制。(对象--->持久化过程)。而反序列化是将二进制转换成对象(内存)。

    应用场景:持久化或者活化,网络之间传输
    
  • 示例:

    package hadoop01.test;
    
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    
    import hadoop01.User;
    
    // @SuppressWarnings去除代码中warinning警告
    @SuppressWarnings("resource")
    public class TestOne {
    	public static void main(String[] args) throws Exception {
    		User user = new User();
    		user.setUid(1);
    		user.setName("xjk");
    		// 持久化 序列化 写到磁盘
    		ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("J:\java大数据\大数据\a.txt"));
    		// 对象写入
    		oos.writeObject(user);
    		oos.close();
    		// 反序列化,从磁盘中读取,转换成Java对象
    		ObjectInputStream ois = new ObjectInputStream(new FileInputStream("J:\java大数据\大数据\a.txt"));
    		User u = (User)ois.readObject();
    		System.out.println(u.name);
    	}
    }
    
    
  • 而hadoop没有使用jdk原生序列化,而是采用写入对象内单个值方式。相对于写入整个对象来说,占用磁盘资源更少。如下示例:

    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    
    import hadoop01.User;
    
    public class TestTwo {
    	public static void main(String[] args) throws Exception {
    		User user = new User();
    		user.setUid(1);
    		user.setName("xjk");
    		// 持久化 序列化 写到磁盘
    		ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("J:\java大数据\大数据\b.txt"));
    		// 写入
    		oos.writeInt(user.getUid());
    		oos.writeUTF(user.getName());
    		oos.close();
    		ObjectInputStream ois = new ObjectInputStream(new FileInputStream("J:\java大数据\大数据\b.txt"));
    		int uid = ois.readInt();
    		String name =ois.readUTF();
    		System.out.println(uid + "===" + name);
    		ois.close();
    	}
    }
    

    上面方式将uid和name单独写入,占用磁盘空间15K,而写入对象占用空间有70K。

2.服务

  • 自定义socket服务:客户端发送字符串:abc:toUpCase,服务端将abc转大写

    • 服务端
    package com.xjk.utils;
    
    import java.io.InputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /*
     * 监听端口
     * 接收请求数据
     * 根据自定义的协议解析请求数据
     * 执行业务
     * */
    public class HdpServer {
    	public static void main(String[] args) throws Exception {
    		ServerSocket ss = new ServerSocket(9996);
    		System.out.println("服务端启动...");
    		while (true) {
    			Socket socket = ss.accept();
    			System.out.println("接收数据..");
    			ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
    			ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
    			// 拿到接收数据
    			String request = (String)ois.readObject();
    			System.out.println(request);
    			// 根据 : 分割
    			String[] split = request.split(":");
    			String word = split[0];
    			// 判断调用方法
    			String methodName = split[1];
    			if ("toUpCase".equals(methodName)) {
    				MyUtils mu = new MyUtils();
    				String newWord = mu.toUpCase(word);
    				oos.writeObject(newWord);
    			}
    			oos.close();
    			ois.close();
    			socket.close();
    		}
    	}
    }
    
    
    • 客户端
    package com.xjk.client;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    import java.net.UnknownHostException;
    
    /*
     * 根据协议发送请求数据
     * 接收响应结果
     * */
    public class HdpClient {
    	public static void main(String[] args) throws Exception {
    		Socket socket = new Socket("localhost", 9996);
    		System.out.println("客户端启动...");
    		ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
    		ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
    		oos.writeObject("abc:toUpCase");
    		System.out.println("发送数据...");
    		String res = (String)ois.readObject();
    		System.out.println(res);
    		oos.close();
    		ois.close();
    	}
    }
    
    

3.分布式文件系统基本设计

  • 分布式存储(文件)系统HDFS.用于存储海量数据,数据存储在集群中,它可以动态扩容。

  • 当你有源数据存储在机器上。为了数据安全数据会存储副本,存储副本在不同机器上,存储副本机器上存储着全部或部分数据。比如你的一台副本机器挂掉,它会将数据从另一个机器上拷贝过来。当拷贝过程中出现网络阻塞情况下。为了防止这个现象发生,文件存储会切块(block块)一般默认128M.每个都有自己编号,具有唯一标识。

  • 磁盘存储数据是通过寻道找的。如果寻道消耗时间10ms,最好处理数据100倍寻道操作。也就是1000ms,1000ms将数据传输。一般磁盘传输速率一般是100M/S,所以文件存储会切块一般为128M。

    这样在整个系统中,有个节点去管理数据存储的源数据,一般记录数据信息有:
    文件名  block快唯一标识 起始和结束偏移量,文件大小 存储在哪台机器上,副本存储个数等等。
    
    

    最终,它会对外提供一套虚拟访问目录(虚拟目录和真实文件存储的映射)。

  • 为了保证数据高容错:存储文件的副本,并且内部可以进行容错处理(拷贝到相应机器)。

  • 文件在存储时候会切分存储大小(物理切换) 默认128M。

  • namenode管理存储源数据,接收客户端请求(上传和下载),返回元数据。

  • 真正存储数据的节点datanode。

  • namenode管理多个datanode。datanode会定时向namenode汇报。

4.NameNode

  • 用于记录文件存储的元数据(NameNode将这些数据的元数据信息记录在内存中,并且将这些元数据信息定期的序列化到本地磁盘上)

  • 记录用户存储的文件的大小、切分的块数、每一块的副本数和存储在DataNode上的位置

  • NameNode中的元数据目录下的fsimage文件是用来存储元数据的

  • NameNode中的元数据目录下的VERSION文件里面存储的是一系列的ID,相当于该集群的标识。

1 接收客户端的请求 , 分配数据存储的节点位置
2 管理集群中存储数据的元数据
3 容错的处理 , 当有机器宕机.检查元数据 , 复制出来副本数据
4 接收datanode的心跳感应 , 接收datanode汇报的数据存储情况

5.DataNode

  • 用于存储用户上传的文件数据

  • 数据是存储在该DataNode软件所在的服务器的本地磁盘中

  • DataNode中的数据保存目录下的VERSION文件里面存储的一系列ID跟NameNode里面的完全一致(说明他们是属于同一个集群的)

    1 存储数据
    2 向namenode汇报数据存储情况
    3 发送心跳机制
    

6.HDFS安装

  • 前提条件:

    JDK
    集群中各个机器可以通信(NAT),互相可以ping通
    集群中各个机器互相免密
    ip 主机名 域名映射
    
    主机名 ip地址 角色
    linux01 10.0.0.134 namenode/datanode
    linux02 10.0.0.131 datanode/secondaryNameNode
    linux03 10.0.0.132 datanode
  • 安装:

    1下载jdk 略
    2安装hadoop  解压相应目录:tar -zxvf hadoop-2.8.5.tar.gz
    # 解压比较慢
    3.cd到hadoop
    bin		#客户端交互命令
    etc		#配置文件
    include
    lib
    libexec
    LICENSE.txt
    NOTICE.txt
    README.txt
    sbin	#启停命令 
    share	#文档jar包
    
  • 配置:

    在  hadoop-2.8.5/etc/hadoop 内进行配置:
    1.JAVA_HOME配置
    yum安装默认路经:/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/
    vim hadoop-env.sh
    	export JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/
    
    2.hdfs-site.xml配置  指定namenode所在的机器 和 元数据 和 数据存储目录
    vim hdfs-site.xml
    	dfs.namenode.rpc-address 配置namenode地址
    	dfs.namenode.name.dir 配置namenode存储地址
    	dfs.datanode.data.dir datanode存储地址
    	dfs.namenode.secondary.http-address 保证数据安全性
    
  • 配置参数:

    #linux为例: vim hdfs-site.xml
    
    <property>
      <name>dfs.namenode.rpc-address</name>
      <value>linux01:9000</value>
      <description></description>
    </property>
    
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>/opt/hdp/name/</value>
      <description></description>
    </property>
    
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>/opt/hdp/data/</value>
      <description></description>
    </property>
    
    <property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>linux02:50090</value>
      <description></description>
    </property>
    
  • scp命令将hadoop安装包分发到集群中各个机器上

    scp -r hadoop-2.8.5/ linux02:$PWD
    scp -r hadoop-2.8.5/ linux03:$PWD
    

    拷贝过程很长,可以将share中doc文件删除后拷贝,里面有很多小文件文档导致拷贝速度变慢:rm -rf /opt/hdp/hadoop-2.8.5/share/doc

  • 启动

    # 启动在hadoop 下bin目录下:
    # 启动之前一定要先初始化namenode,执行如下命令:
    ./hadoop namenode -format
    

    可以看到在/opt/hdp/name/下有一个current文件里面有版本号。

    • 启动目录在sbin中:
    /opt/hdp/hadoop-2.8.5/sbin
    # 启动namenode
    [root@linux01 sbin]./hadoop-daemon.sh start namenode
    
    # 通过jps查看启动的namenode
    [root@linux01 sbin]# jps
    9666 Jps
    9565 NameNode
    
    • 它提供了一个web页面可以使我们访问:通过访问linux01对应ip的url:http://10.0.0.134:50070/显示web页面

    它里面有启动时间,相应版本号等等。。。
    
  • 启动datanode:

    ./hadoop-daemon.sh start datanode
    [root@linux01 sbin]# jps
    9964 Jps
    9565 NameNode
    9919 DataNode# 可以看到启动datanode
    
  • web页面中点击Datanode如下图:此时有一个datanode,包括它存储空间,地址,版本等信息

  • 此时我们在linux02启动

    [root@linux02 sbin]# ./hadoop-daemon.sh start datanode
    
  • linux03也是如此

    [root@linux03 sbin]# ./hadoop-daemon.sh start datanode
    

    注意:如果启动不成功,首先查找配置:hadoop-env.sh中JAVA_HOME路径是否正确, vim hdfs-site.xml配置是否正确,启动前删除原有的data文件。克隆机器会出现current内的版本号一致问题,此时删除,重新启动即可。

  • 节点停止:

    [root@linux03 sbin]# ./hadoop-daemon.sh stop datanode
    # 停止会有一个很长延迟。大概10分钟。
    

    namenode剔除宕机的datanode节点策略是:datanode每隔3s触发心跳访问namenode,当过去30秒都没有触发,namenode会过5分钟去看当前datanode是否死去,如果没有回应,则再过5分钟再次验证datanode是否死去。再没有回应则会剔除当前datanode。

7.HDFS客户端命令介绍

  • bin/hdfs dfs有如下客户端交互命令:

    	[-appendToFile <localsrc> ... <dst>]
    	[-cat [-ignoreCrc] <src> ...]
    	[-checksum <src> ...]
    	[-chgrp [-R] GROUP PATH...]
    	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
    	[-chown [-R] [OWNER][:[GROUP]] PATH...]
    	[-copyFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
    	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
    	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] <path> ...]
    	[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
    	[-createSnapshot <snapshotDir> [<snapshotName>]]
    	[-deleteSnapshot <snapshotDir> <snapshotName>]
    	[-df [-h] [<path> ...]]
    	[-du [-s] [-h] [-x] <path> ...]
    	[-expunge]
    	[-find <path> ... <expression> ...]
    	[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
    	[-getfacl [-R] <path>]
    	[-getfattr [-R] {-n name | -d} [-e en] <path>]
    	[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
    	[-help [cmd ...]]
    	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [<path> ...]]
    	[-mkdir [-p] <path> ...]
    	[-moveFromLocal <localsrc> ... <dst>]
    	[-moveToLocal <src> <localdst>]
    	[-mv <src> ... <dst>]
    	[-put [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
    	[-renameSnapshot <snapshotDir> <oldName> <newName>]
    	[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
    	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
    	[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
    	[-setfattr {-n name [-v value] | -x name} <path>]
    	[-setrep [-R] [-w] <rep> <path> ...]
    	[-stat [format] <path> ...]
    	[-tail [-f] <file>]
    	[-test -[defsz] <path>]
    	[-text [-ignoreCrc] <src> ...]
    	[-touchz <path> ...]
    	[-truncate [-w] <length> <path> ...]
    	[-usage [cmd ...]]
    
    Generic options supported are
    -conf <configuration file>     specify an application configuration file
    -D <property=value>            use value for given property
    -fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
    -jt <local|resourcemanager:port>    specify a ResourceManager
    -files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
    -libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
    -archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
    
    The general command line syntax is
    command [genericOptions] [commandOptions]
    
  • 用法:

    # 查看集群存储能力
    ./hdfs dfs -df
    
    # 在linux02上传文件: 将/root/test_hdp.txt 上传到linux01机器上
    [root@linux02 bin]#hdfs dfs -put /root/test_hdp.txt hdfs://linux01:9000/
    

    如图:可以在客户端看到:test_hdp.txt上传信息:存储节点,分块个数等等。

原文地址:https://www.cnblogs.com/xujunkai/p/13910882.html