HDFS FileSystem(二) 读写操作

1. 读取HDFS文件

1.1 hdfs文件字节数组读取

 public void readFileByAPI() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.default.name", "hdfs://192.168.8.156:9000/");
        FileSystem fileSystem = FileSystem.get(conf);
        Path path = new Path("/user/compass/readme.txt");
 
        FSDataInputStream fsDataInputStream = fileSystem.open(path);
        byte[] bytes = new byte[1024];
        int len = -1;
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
 
        while ((len = fsDataInputStream.read(bytes)) != -1) {
            stream.write(bytes, 0, len);
        }
        fsDataInputStream.close();
        stream.close();
        System.out.println(new String(stream.toByteArray()));
 
    }

1.2 下载hdfs文件到本地

public void getFile(String dst, String local) {
     try {
         if (!filesystem.exists(new Path(dst))) {
             System.out.println("文件不存在!");
         } else {
             filesystem.copyToLocalFile(new Path(dst), new Path(local));
             System.out.println("下载成功!");
         }
     } catch (IOException e) {
         System.out.println("下载失败!");
         e.printStackTrace();
     }
 }

2. 写入HDFS文件

2.1 将字符串写入hdfs新文件中

def wirteToHdfs(hdfs: FileSystem, json: String, blockPath: String): Unit = {
    val path = new Path(blockPath)
    val fileOutputStream: FSDataOutputStream = hdfs.create(path)
    fileOutputStream.writeBytes(json)
    if (fileOutputStream != null) {
      fileOutputStream.close()
    }
  }

2.2 上传本地到文件hdfs


@Test
    public void copyFromLocalFile() throws Exception{
        Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
        Path destpath = new Path("/hdfsapi/test");
        fileSystem.copyFromLocalFile(localpath, destpath);
    }
 
@Test
    public void copyFromLocalBigFile() throws Exception{
        Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
        Path destpath = new Path("/hdfsapi/test");
        fileSystem.copyFromLocalFile(localpath, destpath);
        InputStream in = new BufferedInputStream(
          new FileInputStream(
                  new File("/home/pumbaa/Downloads/ideaIU-2017.3.4.tar.gz")));

        FSDataOutputStream output = fileSystem.create(new Path("/hdfsapi/test/ideaIU-2017.3.4.tar.gz"),
                new Progressable() {
                    public void progress() {
                        System.out.print("*");
                    }
                });
        IOUtils.copyBytes(in, output, 4096);
    }

3.文件操作

3.1创建目录

/**
 * 创建目录
 * @param path 创建目录的地址(例:/hadoop/)
 * @throws IOException
 */
public void mkdir(String path) throws IOException {
    //创建hdfs目录
    if(filesystem.exists(new Path(path)))
    {
        System.out.println("目录已存在");
    }
    else
    {
        boolean result=filesystem.mkdirs(new Path(path));
        System.out.println(result);
    }
}

3.2创建文件

/**
 * 创建文件
 * @param path hdfs文件地址(例:/hadoop/abc.txt)
 * @throws IOException
 */
public  void create(String path) throws IOException{
    //创建文件
    if(filesystem.exists(new Path(path)))
    {
        System.out.println("文件已存在");
    }
    else
    {
        FSDataOutputStream outputStream=  filesystem.create(new Path(path));
        System.out.println("文件创建成功");
    }
}

3.3查看文件内容

/**
 * 查看文件内容
 * @param dst hdfs文件地址(例:/hadoop/abc.txt)
 * @throws IOException
 */
public void read(String dst) throws IOException {
    if(filesystem.exists(new Path(dst)))
    {
        FSDataInputStream inputstream=filesystem.open(new Path(dst));
        InputStreamReader isr=new InputStreamReader(inputstream);
        BufferedReader br=new BufferedReader(isr);
        String str=br.readLine();
        while(str!=null){
            System.out.println(str);
            str=br.readLine();
        }
        br.close();
        isr.close();
        inputstream.close();
    }
   else
    {
        System.out.println("文件不存在");
    }
}

3.4文件重命名,移动文件

/**
 * 将dst1重命名为dst2,也可以进行文件的移动
 * @param oldpath 旧名
 * @param newpath 新名
 */
public void moveFile(String oldpath, String newpath) {
    Path path1 = new Path(oldpath);
    Path path2 = new Path(newpath);
    try {
        if (!filesystem.exists(path1)) {
            System.out.println(oldpath + " 文件不存在!");
            return;
        }
        if (filesystem.exists(path2)) {
            System.out.println(newpath + "已存在!");
            return;
        }
        // 将文件进行重命名,可以起到移动文件的作用
        filesystem.rename(path1, path2);
        System.out.println("文件已重命名!");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

3.5显示目录下所有文件

/**
 * 显示目录下所有文件
 * @param dst
 */
public void listStatus(String dst) {
    try {
        if (!filesystem.exists(new Path(dst))) {
            System.out.println("目录不存在!");
            return;
        }
        // 得到文件的状态
        FileStatus[] status = filesystem.listStatus(new Path(dst));
        for (FileStatus s : status) {
            System.out.println(s.getPath().getName());
        }

    } catch (IllegalArgumentException | IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

3.6删除hdfs文件

/**
 * 删除hdfs中的文件
 * @param dst
 */
public void deleteFile(String dst) {
    try {
        if (!filesystem.exists(new Path(dst))) {
            System.out.println("文件不存在!");
        } else {
            filesystem.delete(new Path(dst), true);
            System.out.println("删除成功!");
        }
    } catch (IOException e) {
        System.out.println("删除失败!");
        e.printStackTrace();
    }
}
原文地址:https://www.cnblogs.com/chong-zuo3322/p/13181069.html