[HDFS_4] HDFS 的 Java 应用开发


 0. 说明

   在 IDEA下 进行 HDFS 的 Java 应用开发

  通过编写代码实现对 HDFS 的增删改查操作


1. 流程

  1.1 在项目下新建 Moudle

  略

  1.2 为 Moudle 添加 Maven 框架支持

  略

  1.3 添加 Maven 依赖

    <dependencies>
        <!-- Hadoop Client依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>

        <!-- 单元测试依赖 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>

  1.4 将 Hadoop/etc/ha 目录下的 [core-site.xml] [hdfs-site.xml] [log4j.properties] 存入 resources 中

  


2. 代码编写

package hadoop.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * IDEA 下测试 HDFS 的增删改查
 */
public class TestHDFS {

    // 1. 测试读取
    @Test
    public void testRead() throws IOException {

        // 初始化配置文件
        Configuration conf = new Configuration();

        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        // 初始化路径
        Path p = new Path("/a.txt");

        // 通过文件系统获取输入流
        // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
        FSDataInputStream fis = fs.open(p);

        int len = 0;
        byte[] buf = new byte[1024];

        while ((len = fis.read(buf)) != -1) {
            System.out.println(new String(buf, 0, len));
        }
        fis.close();
    }

    // 2. 测试读取并通过 IOUtils 拷贝文件到本地
    @Test
    public void testRead2() throws Exception {

        // 初始化配置文件
        Configuration conf = new Configuration();
        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        // 初始化路径
        Path p = new Path("/a.txt");

        // 通过文件系统获取输入流
        // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
        FSDataInputStream fis = fs.open(p);

        FileOutputStream fos = new FileOutputStream("D:/1.txt");

        // 通过 IOUtils 拷贝文件
        IOUtils.copyBytes(fis, fos, 1024);

        fis.close();
        fos.close();
        System.out.println("ok");

    }

    // 3. 测试写文件,将本地文件写入到 HDFS 中
    @Test
    public void testwrite() throws IOException {

        // 设置系统用户名
        System.setProperty("HADOOP_USER_NAME", "centos");

        // 初始化配置文件
        Configuration conf = new Configuration();
        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        // 获得输入流
        FileInputStream fis = new FileInputStream("E:/p_data/test/customers.txt");

        // 初始化路径
        Path pout = new Path("/b.txt");

        // 通过文件系统获取输出流
        // FSDataOutputStream 是 outputStream 的装饰流,可以通过普通流方式操纵 fos
        FSDataOutputStream fos = fs.create(pout);

        // 通过 IOUtils 拷贝文件
        IOUtils.copyBytes(fis, fos, 1024);

        fis.close();
        fos.close();
        System.out.println("ok");

    }

    // 创建文件夹
    @Test
    public void testMkdir() throws IOException {

        // 设置系统用户名
        System.setProperty("HADOOP_USER_NAME", "centos");

        // 初始化配置文件
        Configuration conf = new Configuration();
        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        boolean b = fs.mkdirs(new Path("/aaa"));

        System.out.println(b);


    }

    // 删除文件夹
    @Test
    public void testDelete() throws IOException {

        // 设置系统用户名
        System.setProperty("HADOOP_USER_NAME", "centos");

        // 初始化配置文件
        Configuration conf = new Configuration();
        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        boolean b = fs.delete(new Path("/aaa"),true);

        System.out.println(b);
    }

    // 文件末尾追加文件
    @Test
    public void testAppend() throws IOException {

        // 设置系统用户名
        System.setProperty("HADOOP_USER_NAME", "centos");

        // 初始化配置文件
        Configuration conf = new Configuration();
        // 初始化文件系统
        FileSystem fs = FileSystem.get(conf);

        // 通过文件系统获取输出流
        // FSDataOutputStream 是 outputStream 的装饰流,可以通过普通流方式操纵 fos
        FSDataOutputStream fos = fs.append(new Path("/a.txt"));

        // 通过文件系统获取输入流
        // FSDataInputStream 是 inputStream 的装饰流,可以通过普通流方式操纵 fis
        FileInputStream fis = new FileInputStream("E:/p_data/add.txt");

        // 通过 IOUtils 拷贝文件
        IOUtils.copyBytes(fis,fos,1024);

        fis.close();
        fos.close();
    }

    // 通过递归列出指定文件夹下的文件或文件夹信息
    public static void testList(String path) {
        try {
            // 初始化配置文件
            Configuration conf = new Configuration();
            // 初始化文件系统
            FileSystem fs = FileSystem.get(conf);

            FileStatus[] statuses = fs.listStatus(new Path(path));

            for (FileStatus status : statuses) {
                if (status.isDirectory()) {
                    path = status.getPath().toString();
                    System.out.println(path);
                    testList(path);
                } else {
                    System.out.println(status.getPath().toString());
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        testList("/");
    }

}

原文地址:https://www.cnblogs.com/share23/p/9779418.html