Java读写HDFS文件

一、依赖包maven路径


  1. <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
  2. <dependency>
  3.     <groupId>org.apache.hadoop</groupId>
  4.     <artifactId>hadoop-client</artifactId>
  5.     <version>2.7.3</version>
  6.     <scope>runtime</scope>
  7. </dependency>

二、针对HDFS文件的操作类HDFSOperate


  1. package com.hdfs.util;
  2.  
  3. import java.io.BufferedReader;
  4. import java.io.File;
  5. import java.io.FileOutputStream;
  6. import java.io.IOException;
  7. import java.io.InputStreamReader;
  8. import java.io.PrintStream;
  9. import java.net.URI;
  10.  
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.FSDataInputStream;
  13. import org.apache.hadoop.fs.FSDataOutputStream;
  14. import org.apache.hadoop.fs.FileSystem;
  15. import org.apache.hadoop.fs.Path;
  16. import org.apache.hadoop.io.IOUtils;
  17.  
  18. /**
  19.  * 针对HDFS文件的操作类
  20.  */
  21. public class HDFSOperate {
  22.  
  23. /**
  24.  * 新增(创建)HDFS文件
  25.  * @param hdfs
  26.  */
  27. public void createHDFS(String hdfs){
  28. try {
  29. Configuration conf = new Configuration();
  30. conf.setBoolean("dfs.support.append", true);  
  31. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
  32.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
  33. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
  34. Path path = new Path(hdfs); 
  35. //判断HDFS文件是否存在
  36. if(fs.exists(path)){
  37. //System.out.println(hdfs + "已经存在!!!");
  38. }else{
  39. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
  40. hdfsOutStream.close();
  41. }
  42. fs.close();
  43. } catch (Exception e) {
  44. // TODO: handle exception
  45. e.printStackTrace();
  46. }
  47. }
  48. /**
  49.  * 在HDFS文件后面追加内容
  50.  * @param hdfs
  51.  * @param appendContent
  52.  */
  53. public void appendHDFS(String hdfs,String appendContent){
  54. try {
  55. Configuration conf = new Configuration();
  56. conf.setBoolean("dfs.support.append", true);  
  57. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
  58.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
  59. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
  60. Path path = new Path(hdfs);
  61. //判断HDFS文件是否存在
  62. if(fs.exists(path)){
  63. //System.out.println(hdfs + "已经存在!!!");
  64. }else{
  65. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
  66. hdfsOutStream.close();
  67. }
  68. FSDataOutputStream hdfsOutStream = fs.append(new Path(hdfs));
  69. byte [] str = appendContent.getBytes("UTF-8");//防止中文乱码
  70. hdfsOutStream.write(str);
  71. hdfsOutStream.close();
  72. fs.close();
  73. } catch (Exception e) {
  74. // TODO: handle exception
  75. e.printStackTrace();
  76. }
  77. }
  78. /**
  79.  * 修改HDFS文件内容 /  删除就是替换为空
  80.  * @param hdfs : hdfs文件路径
  81.  * @param sourceContent :要修改的hdfs文件内容
  82.  * @param changeContent :需要修改成的文件内容
  83.  */
  84. public void change(String hdfs,String sourceContent,String changeContent){
  85. try {
  86. Configuration conf = new Configuration();
  87. conf.setBoolean("dfs.support.append", true);  
  88. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
  89.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
  90. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
  91. Path path = new Path(hdfs);
  92. //判断HDFS文件是否存在
  93. if(fs.exists(path)){
  94. //System.out.println(hdfs + "已经存在!!!");
  95. FSDataInputStream in = fs.open(path);
  96. BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
  97. String totalString = "";
  98. String line = null;
  99. while ((line = bf.readLine()) != null) {
  100. totalString += line;
  101. }
  102. String changeString = totalString.replace(sourceContent, changeContent);
  103. FSDataOutputStream hdfsOutStream = fs.create(new Path(hdfs));
  104. byte [] str = changeString.getBytes("UTF-8");
  105. hdfsOutStream.write(str);
  106. hdfsOutStream.close();
  107. }else{
  108. //System.out.println(hdfs + "不存在,无需操作!!!");
  109. }
  110. fs.close();
  111. } catch (Exception e) {
  112. // TODO: handle exception
  113. e.printStackTrace();
  114. }
  115. }
  116. /**
  117.  * 判断要追加的内容是否存在
  118.  * @param hdfs
  119.  * @param appendContent
  120.  * @return
  121.  */
  122. public Boolean isContentExist(String hdfs,String appendContent){
  123. try {
  124. Configuration conf = new Configuration();
  125. conf.setBoolean("dfs.support.append", true);  
  126. conf.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
  127.         conf.set("dfs.client.block.write.replace-datanode-on-failure.enable","true");
  128. FileSystem fs = FileSystem.get(URI.create(hdfs), conf);
  129. Path path = new Path(hdfs);
  130. //判断HDFS文件是否存在
  131. if(fs.exists(path)){
  132. //System.out.println(hdfs + "已经存在!!!");
  133. FSDataInputStream in = fs.open(path);
  134. BufferedReader bf=new BufferedReader(new InputStreamReader(in));//防止中文乱码
  135. String totalString = "";
  136. String line = null;
  137. while ((line = bf.readLine()) != null) {
  138. totalString += line;
  139. }
  140. if(totalString.contains(appendContent)){
  141. return true;
  142. }
  143. }else{
  144. //System.out.println(hdfs + "不存在,无需操作!!!");
  145. }
  146. fs.close();
  147. } catch (Exception e) {
  148. // TODO: handle exception
  149. e.printStackTrace();
  150. }
  151. return false;
  152. }
  153. public static void main(String[] args) throws IOException {
  154. String hdfs = "hdfs://192.168.168.200:9000/test/tes.txt";
  155. HDFSOperate hdfsOperate = new HDFSOperate();
  156. hdfsOperate.createHDFS(hdfs);
  157. hdfsOperate.appendHDFS(hdfs,"测试新增内容");
  158. //hdfsOperate.change(hdfs, "测试新增内容", "测试修改成功");
  159. }
  160. }
原文地址:https://www.cnblogs.com/yangcx666/p/8723866.html