使用java api操作HDFS文件

实现的代码如下:

  1     import java.io.IOException;  
  2     import java.net.URI;  
  3     import java.net.URISyntaxException;  
  4       
  5     import org.apache.hadoop.conf.Configuration;  
  6     import org.apache.hadoop.fs.FSDataInputStream;  
  7     import org.apache.hadoop.fs.FSDataOutputStream;  
  8     import org.apache.hadoop.fs.FileStatus;  
  9     import org.apache.hadoop.fs.FileSystem;  
 10     import org.apache.hadoop.fs.FileUtil;  
 11     import org.apache.hadoop.fs.Path;  
 12     import org.apache.hadoop.io.IOUtils;  
 13       
 14       
 15     public class HDFSTest {  
 16           
 17         //在指定位置新建一个文件,并写入字符  
 18         public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException  
 19         {  
 20             Configuration conf = new Configuration();  
 21             FileSystem fs = FileSystem.get(URI.create(file), conf);  
 22             Path path = new Path(file);  
 23             FSDataOutputStream out = fs.create(path);   //创建文件  
 24       
 25             //两个方法都用于文件写入,好像一般多使用后者  
 26             out.writeBytes(words);    
 27             out.write(words.getBytes("UTF-8"));  
 28               
 29             out.close();  
 30             //如果是要从输入流中写入,或是从一个文件写到另一个文件(此时用输入流打开已有内容的文件)  
 31             //可以使用如下IOUtils.copyBytes方法。  
 32             //FSDataInputStream in = fs.open(new Path(args[0]));  
 33             //IOUtils.copyBytes(in, out, 4096, true)        //4096为一次复制块大小,true表示复制完成后关闭流  
 34         }  
 35           
 36         public static void ReadFromHDFS(String file) throws IOException  
 37         {  
 38             Configuration conf = new Configuration();  
 39             FileSystem fs = FileSystem.get(URI.create(file), conf);  
 40             Path path = new Path(file);  
 41             FSDataInputStream in = fs.open(path);  
 42               
 43             IOUtils.copyBytes(in, System.out, 4096, true);  
 44             //使用FSDataInoutStream的read方法会将文件内容读取到字节流中并返回  
 45             /** 
 46              * FileStatus stat = fs.getFileStatus(path); 
 47           // create the buffer 
 48            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; 
 49            is.readFully(0, buffer); 
 50            is.close(); 
 51                  fs.close(); 
 52            return buffer; 
 53              */  
 54         }  
 55           
 56         public static void DeleteHDFSFile(String file) throws IOException  
 57         {  
 58             Configuration conf = new Configuration();  
 59             FileSystem fs = FileSystem.get(URI.create(file), conf);  
 60             Path path = new Path(file);  
 61             //查看fs的delete API可以看到三个方法。deleteonExit实在退出JVM时删除,下面的方法是在指定为目录是递归删除  
 62             fs.delete(path,true);  
 63             fs.close();  
 64         }  
 65           
 66         public static void UploadLocalFileHDFS(String src, String dst) throws IOException  
 67         {  
 68             Configuration conf = new Configuration();  
 69             FileSystem fs = FileSystem.get(URI.create(dst), conf);  
 70             Path pathDst = new Path(dst);  
 71             Path pathSrc = new Path(src);  
 72               
 73             fs.copyFromLocalFile(pathSrc, pathDst);  
 74             fs.close();  
 75         }  
 76           
 77         public static void ListDirAll(String DirFile) throws IOException  
 78         {  
 79             Configuration conf = new Configuration();  
 80             FileSystem fs = FileSystem.get(URI.create(DirFile), conf);  
 81             Path path = new Path(DirFile);  
 82               
 83             FileStatus[] status = fs.listStatus(path);  
 84             //方法1    
 85             for(FileStatus f: status)  
 86             {  
 87                 System.out.println(f.getPath().toString());    
 88             }  
 89             //方法2    
 90             Path[] listedPaths = FileUtil.stat2Paths(status);    
 91             for (Path p : listedPaths){   
 92               System.out.println(p.toString());  
 93             }  
 94         }  
 95           
 96         public static void main(String [] args) throws IOException, URISyntaxException  
 97         {  
 98             //下面做的是显示目录下所有文件  
 99             ListDirAll("hdfs://ubuntu:9000/user/kqiao");  
100               
101             String fileWrite = "hdfs://ubuntu:9000/user/kqiao/test/FileWrite";  
102             String words = "This words is to write into file!
";  
103             WriteToHDFS(fileWrite, words);  
104             //这里我们读取fileWrite的内容并显示在终端  
105             ReadFromHDFS(fileWrite);  
106             //这里删除上面的fileWrite文件  
107             DeleteHDFSFile(fileWrite);  
108             //假设本地有一个uploadFile,这里上传该文件到HDFS  
109     //      String LocalFile = "file:///home/kqiao/hadoop/MyHadoopCodes/uploadFile";  
110     //      UploadLocalFileHDFS(LocalFile, fileWrite    );  
111         }  
112     }  

FSDataOutputStream os = hdfs.create(new Path(args[0]));

注意:在os.flush()   刷新数据流;

有时写入的文件不能立即被其他读者看见,只有大于一个块时其他读者才能看见第一个块,但还是不能看见当前块。可以使用out.sync()  强制所有缓存与数据节点同步。其实在每一个os.close()中隐含了一个sync()的调用。

原文地址:https://www.cnblogs.com/gaopeng527/p/4399219.html