HDFS的JavaAPI操作

原理层面暂时了解不深,只能通过简单的Java代码对HDFS系统进行基本的简单操作,在此做记录如下:

一、HDFS基本操作

1、获取FileSystem

首先需要获取HDFS这个分布式文件系统,JAVA的 org.apache.hadoop.fs 包下的FileSystem类便是为文件系统设计的。我们的目标便是实例化出HDFS的FileSystem对象。

获取FileSystem有多种方式,这里为入门先介绍最简单的一种。

    public void getFileSystem2() throws IOException, URISyntaxException, InterruptedException {
        /* 第一个参数 HDFS 的主机名 + 端口
         * 第二个参数:一个Configuration 对象,配置信息的对象
     * Configuration configuration = new Configuration();
      * configuration.set("fs.defaultFS","hdfs://bigdata1:8020"); * 第三个参数:指定用户,root用户为超级管理员 , 无特殊情况下尽量不加该参数
*/ FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020"), new Configuration(), "root"); System.out.println(fileSystem); // 输出 DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_373282607_1, ugi=11655 (auth:SIMPLE)]] // 关闭 fileSystem.close(); }

2、创建文件 / 文件夹

    public void mkdirs() throws URISyntaxException, IOException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration());
        // 创建文件夹
        boolean bl = fileSystem.mkdirs(new Path("/aaa2/bbb/ccc"));
        //创建文件
        fileSystem.create(new Path("/aaa/aaa.txt"));
        // 两个创建方法都为递归创建
        System.out.println(bl);
        fileSystem.close();
    }

3、文件下载

    public void downloadFile() throws URISyntaxException, IOException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration());
        // 第一个参数,y要下载的HDFS文件路径
        // 第二个参数,下载到本机的目录(不是虚拟机的主机)
        // Path(String str),路径类
        fileSystem.copyToLocalFile(new Path("/a.txt"), new Path("D://a2.txt"));
        fileSystem.close();
    }

4、文件上传

    public void uploadFile() throws URISyntaxException, IOException {
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://bigdata1:8020/a.txt"), new Configuration());
        /*
        * 第一个参数:本地文件路径
        * 第二个参数:要上传的HDFS目录
        * */
        fileSystem.copyFromLocalFile(new Path("D://b.txt"),new Path("/"));
        fileSystem.close(); 
    }

5、创建文件并写入数据

        // 操作文件系统通过 FSDataOutputStream 流,通过fs.create方法得到流对象
        FSDataOutputStream fsDataOutputStream = fs.create(new Path("\user\hadoop\test\data.dat"));
        fsDataOutputStream.write("Hello,BigData1111111111111".getBytes()); // 默认字节流
        // 关闭流
        fsDataOutputStream.close();

6、递归目录

    @Test
    public void listFiles() throws IOException {
        // 调用方法listFiles 获取一个目录下的文件信息,为一个迭代器对象
        // 第一个参数:指定目录
        // 第二个参数,是否迭代获取
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/user/hadoop"), true);
        while (listFiles.hasNext()) {
            // 每个迭代器元素为一个 LocatedFileStatus 对象,存储着文件的信息
            LocatedFileStatus fileStatus = listFiles.next();

            System.out.println("Name: " + fileStatus.getPath().getName());
            System.out.println("Len: " + fileStatus.getLen());
            System.out.println("BlockSize: " + fileStatus.getBlockSize());
            System.out.println("Replication: " + fileStatus.getReplication());

            BlockLocation[] blockLocations = fileStatus.getBlockLocations();

            for (BlockLocation blk : blockLocations) {
                System.out.println("blk-length:" + blk.getLength() + " - blk-offset: " + blk.getOffset());

                String[] hosts = blk.getHosts();
                for (String host : hosts) {
                    System.out.println("host: " + host);
                }
            }
            System.out.println("-------------------end...----------------------");
        }
    }

 二、工具类的实现

上述代码可以看出有些代码,例如文件系统的获取,关闭等,每个函数都重复使用,比较繁琐,我们可以封装起来,作为自己的工具类utils,方便使用。

package utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.io.IOException;

public class HDFSUtils {
    
    // 为方便管理,可将以下字段拿出作为单独的成员变量,便于后期修改
    public static final String DEFAULT_FS_NAME = "fs.defaultFS";
    public static final String DEFAULT_FS_VALUE = "hdfs://bigdata1:8020";
    public static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
    public static final String HADOOP_USER_VALUE = "root";

    // 获取FileSystem对象
    public static FileSystem getFS(){
        Configuration configuration = new Configuration();
        configuration.set(DEFAULT_FS_NAME, DEFAULT_FS_VALUE);
        System.setProperty(HADOOP_USER_NAME, HADOOP_USER_VALUE);
        FileSystem fs = null;
        try {
            fs = FileSystem.get(configuration);
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("获取系统文件对象失败。");
        }
        return fs;
    }

    // 关闭资源
    public static void closeFS(FileSystem fs){
        if (fs != null){
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("关闭系统文件对象失败。");
            }
        }
    }
}

但是有了工具类,每次工程测试,都要创建fileSystem对象和关闭资源,有些繁琐,我们可以在使用Test测试类的 before 和 after 注解,简化代码。

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import utils.HDFSUtils;

import java.io.IOException;

public class Test01 {
    FileSystem fs = null;

    // 在所有功能执行前拿到fileSystem对象即可
    @Before // 在程序测试前执行,只执行一次,可以用于工程的初始化
    public void init(){
        fs = HDFSUtils.getFS();
    }

    // 在所有功能执行完毕后能够关闭FileSystem对象
    @After //在程序测试后执行,只执行一次,可以用于关闭资源等收尾工作
    public void close(){
        HDFSUtils.closeFS(fs);
    }

    // 要测试的类直接加上 @Test 注解即可。
    @Test
    public void mkdir() throws IOException {
        boolean b = fs.mkdirs(new Path("\user\hadoop\test"));
        if (b) {
            System.out.println("创建成功");
        } else {
            System.out.println("创建失败");
        }
    }
}

 三、使用流操作文件系统

我们这里使用第三方工具类IOUtils实现文件流的输入输出操作。

1、文件上传

    @Test
    public void uploadFile() throws IOException {
        // 创建 hdfs 文件输出流,通过此 流 写入文件
        FSDataOutputStream fsDataOutputStream = fs.create(new Path("/user/hadoop/dongao1.txt")); // 一个新路径,创建并写入该文件
        // 获取本地文件输入流,将本地文件读入内存
        FileInputStream inputStream = new FileInputStream("D:\dongao.txt");
        // 通过IOUtils 的 copyBytes 执行读写
        // 参数: 输入流,输出流,每次读取字节的个数
        IOUtils.copyBytes(inputStream, fsDataOutputStream,1024);

        // 调用 IOUtils 的关闭流方法
        IOUtils.closeStream(fsDataOutputStream);
        IOUtils.closeStream(inputStream);
    }

2、文件下载

    @Test
    public void downloadFile() throws IOException {
        // 先打开一个HDFS中存在的文件输入流
        FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/dongao.txt"));
        // 获取一个文件的输出流,将内容写入本地
        FileOutputStream outputStream = new FileOutputStream("D:\董奥.txt");
        // 执行读写,读取输入流的内容,写入到输出流,单个文件大小4096个字节
        IOUtils.copyBytes(inputStream,outputStream,4096);
        outputStream.flush(); // 手动刷新

        IOUtils.closeStream(outputStream);
        IOUtils.closeStream(inputStream);
    }

3、查看文件内容

    @Test
    public void cat() throws IOException {
        FSDataInputStream inputStream = fs.open(new Path("/user/hadoop/dongao.txt"));
        IOUtils.copyBytes(inputStream, System.out, 4096);
        IOUtils.closeStream(inputStream);
    }
原文地址:https://www.cnblogs.com/dongao/p/13806288.html