hafs的java_api(转载)

转载自:http://www.cuiweiyou.com/1405.html

public class TestHdfs {
        private FileSystem hdfs;
        private Configuration conf;
        
      @Test
      // 1 创建空文件
      public void testCreateNewFile() throws Exception {
          conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");    // core-site.xml中到配置
        conf.set("mapred.jop.tracker", "localhost:9001");
        
        hdfs = FileSystem.get(conf);
        //创建空文件
        hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
        hdfs.close();
      }
      
      @Test
      // 2 遍历指定目录及其子目录内的文件
      public void testListFiles() throws Exception {
        // 1.配置器
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        // 2.文件系统
        hdfs = FileSystem.get(conf);
        // 3.遍历HDFS上的文件
        RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true);
        while(fs.hasNext()){
          System.out.println(fs.next());
        }
        hdfs.close();
      }
     
      @Test
      // 3 查看目录/文件状态
        public void listStatus() throws Exception {
            // 1.创建配置器 
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        
        // 2.创建文件系统 
        hdfs = FileSystem.get(conf);
        // 3.遍历HDFS上的文件和目录 
        FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); 
        if (fs.length > 0) { 
          for (FileStatus f : fs) { 
            showDir(f);
          }
        }else{
            System.out.println("nothing...");
        }
        hdfs.close();
        }
      
      // 4 判断是目录,还是文件
      private void showDir(FileStatus fs) throws Exception {
        Path path = fs.getPath();
        System.out.println(path);
        // 如果是目录
        if (fs.isDirectory()) {
          FileStatus[] f = hdfs.listStatus(path);
          if (f.length > 0) {
            for (FileStatus file : f) {
              showDir(file);
            }
          }
        }
        // 如果是文件
        if (fs.isFile()){
            //fs.getXXX();
          long time = fs.getModificationTime();   
          System.out.println("HDFS文件的最后修改时间:"+new Date(time));
        }
      }
      
      @Test
      // 5 判断存在
      public void testExists() throws Exception {
        // 1.创建配置器  
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        //2.创建文件系统  
        hdfs = FileSystem.get(conf);  
        //3.创建可供hadoop使用的文件系统路径
        Path file = new Path("hdfs:/test.txt");  
        // 4.判断文件是否存在(文件目标路径)  
        System.out.println("文件存在:" + hdfs.exists(file));
        hdfs.close();
      }
      
      @Test
      // 6 向文件写入数据
      public void testAppend() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        /* 在hdfs-site.xml中配置了没啥用
    org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): 
        Failed to APPEND_FILE /newfile.txt for DFSClient_NONMAPREDUCE_-610333487_1 on 127.0.0.1 
        because this file lease is currently owned by DFSClient_NONMAPREDUCE_1541357537_1 on 127.0.0.1
        */
        conf.set("dfs.client.block.write.replace-datanode-on-failure.policy" ,"NEVER" );
        conf.set("dfs.client.block.write.replace-datanode-on-failure.enable" ,"true" );
        /*
    java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. 
    (Nodes: current=[DatanodeInfoWithStorage[127.0.0.1:50010,DS-b1c29ca4-24f7-4447-a12b-5ae261663431,DISK]], 
    original=[DatanodeInfoWithStorage[127.0.0.1:50010,DS-b1c29ca4-24f7-4447-a12b-5ae261663431,DISK]]). 
    The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
             */
        conf.set("dfs.support.append", "true");
        
            hdfs = FileSystem.get(conf);
            Path path = new Path("hdfs:/newfile.txt");
            if (!hdfs.exists(path)) {
                hdfs.create(path);
                hdfs.close();
                hdfs = FileSystem.get(conf);
            } 
            
            FSDataOutputStream out = hdfs.append(path);
            out.write("4每次执行次方法只能有一个write语句?!!!
".getBytes("UTF-8"));
            out.write("5每次执行次方法只能有一个write语句?!!!
".getBytes("UTF-8"));
            out.close();
            
            hdfs.close();
      }
      
      @Test
      // 7 读文件
      public void testOpen() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        
        hdfs = FileSystem.get(conf);
        Path path = new Path("hdfs:/newfile.txt");
        FSDataInputStream is = hdfs.open(path);
        FileStatus stat = hdfs.getFileStatus(path);
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println(new String(buffer));
      }
      
      @Test
      // 8 重命名文件
      public void testRename() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        hdfs = FileSystem.get(conf);
        
        //重命名:fs.rename(源文件,新文件)
        boolean rename = hdfs.rename(new Path("/newfile.txt"), new Path("/test.txt"));
        System.out.println(rename);
        
        hdfs.close();
      }
      
      @Test
      // 8 删除目录/文件
      public void testDelete() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        hdfs = FileSystem.get(conf);
        
        //判断删除(路径,true=非空也删除。false=非空时不删除,抛RemoteException、IOException异常)
        boolean delete = hdfs.delete(new Path("hdfs:/test.txt"), true);
        System.out.println("执行删除:"+delete);
        //FileSystem关闭时执行
        boolean exit = hdfs.deleteOnExit(new Path("/out.txt"));
        System.out.println("执行删除:"+exit);
        
        hdfs.close();
      }
      
        @Test
        // 10 创建目录/文件。父目录不存在则直接创建
      public void testCreate() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        hdfs = FileSystem.get(conf);
        
        // 使用HDFS数据输出流(写)对象 在HDSF上根目录创建一个文件夹,其内再创建文件
        FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt"));
        // 在文件中写入一行数据,必须使用UTF-8
        out.write("痞子阿姆,Hello !".getBytes("UTF-8"));
        out.flush();
        
        out = hdfs.create(new Path("/vigiles/alizee.txt"));
        out.write("艾莉婕,Hello !".getBytes("UTF-8"));
        
        out.close();
        
        FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt"));
        FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt"));
        byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
        is.readFully(0, buffer);
        is.close();
        hdfs.close();
        System.out.println(new String(buffer));
      }
        
      @Test
      // 11 创建目录
      public void testMkdir() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        hdfs = FileSystem.get(conf);
        //创建目录
        hdfs.mkdirs(new Path("hdfs:/eminem1"));
        hdfs.mkdirs(new Path("hdfs:/eminem2"));
        hdfs.mkdirs(new Path("hdfs:/eminem3"));
        hdfs.close();
      }
      
      @Test
      // 12 文件备份状态
      public void testGetFileBlockLocations() throws Exception {
        //1.配置器
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        //2.文件系统
        hdfs = FileSystem.get(conf);
        //3.已存在的,必须是文件
        Path path = new Path("hdfs:/vigiles/alizee.txt");
        //4.文件状态
        FileStatus status = hdfs.getFileStatus(path);
        //5.文件块
        //BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen());  //方法1,传入文件的FileStatus
        BlockLocation[] blockLocations = hdfs.getFileBlockLocations(path, 0, status.getLen());  //方法2,传入文件的Path 
        int blockLen = blockLocations.length;
        System.err.println("块数量:"+blockLen);  //如果文件不够大,就不会分块,即得到1
        // 遍历文件块到信息
        for (int i = 0; i < blockLen; i++) {
          //得到块文件大小
          long sizes = blockLocations[i].getLength();
          System.err.println("块大小:"+sizes);
          
          //按照备份数量得到全部主机名
          String[] hosts = blockLocations[i].getHosts();
          for (String host : hosts) {
            System.err.println("主机名:"+host);
          }
          
          //按照备份数量得到全部主机名
          String[] names = blockLocations[i].getNames();
          for (String name : names) {
            System.err.println("IP:"+ name);
          }
        }
        
        hdfs.close();
      }
     
      @Test
      // 13 上传文件
      public void testCopyFromLocalFile() throws Exception {
        // 1.创建配置器  
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        //2.创建文件系统  
        hdfs = FileSystem.get(conf);  
        //3.创建可供hadoop使用的文件系统路径  
        Path src = new Path("file:/root/xx.txt"); //本地目录/文件  
        Path dst = new Path("hdfs:/");  //目标目录/文件 
        // 4.拷贝本地文件上传(本地文件,目标路径)  
        hdfs.copyFromLocalFile(src, dst);  
        System.out.println("文件上传成功至:" + conf.get("fs.default.name"));  
        // 5.列出HDFS上的文件  
        FileStatus[] fs = hdfs.listStatus(dst);  
        for (FileStatus f : fs) {   
          System.out.println(f.getPath());  
        }
        
        hdfs.close();
      }
      
      @Test
      // 14 下载文件到本地
      public void testCopyToLocalFile() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        conf.set("mapred.jop.tracker", "localhost:9001");
        
        hdfs = FileSystem.get(conf);  
        //创建HDFS源路径和本地目标路径
        Path src = new Path("hdfs:/xx.txt");  //目标目录/文件 
        Path dst = new Path("file:/root/桌面/new.txt"); //本地目录/文件  
        //拷贝本地文件上传(本地文件,目标路径)  
        hdfs.copyToLocalFile(src, dst);
        
        hdfs.close();
      }
    }
原文地址:https://www.cnblogs.com/chenyansong/p/5514639.html