HDFS文件的基本操作

HDFS文件的基本操作:

package wjn;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.BasicConfigurator;

public class ww {

    public static Scanner sc = new Scanner(System.in); 
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        
        main9();
    }

    public static void main10() throws IOException{
        String p1 = "/user/hadoop/text/text1";
        String p2 = "/user/hadoop";
        if(mv(p1,p2)){
            System.out.println("文件移动成功");
        }else{
            System.out.println("文件移动失败");
        }
    }
    
    public static void main9() throws IOException{
        String hdfspath = "/user/hadoop/text2";
        
        if(deleteFileFromHDFS(hdfspath)){
        System.out.println("已删除文件"+hdfspath);
        }else{
            System.out.println("删除文件失败");
        }
    }
    
    public static void main7() throws IOException{
        
        String hdfspath1 = "/user/hadoop";
        boolean forcedelete = false;
        if(!ifex(hdfspath1)){
            mkdir(hdfspath1);
            System.out.println("创建目录"+hdfspath1);
        }else{
            if(isempty(hdfspath1)||forcedelete){
                rmDir(hdfspath1);
                System.out.println("删除目录"+hdfspath1);
            }else{
                System.out.println("目录不为空,不删除");
            }
        }
        
    }
    
    
public static void main6() throws IOException{
        
    String hdfspath = "/user/hadoop/text2";
    String hdfspath1 = "/user/hadoop";
    if(ifex(hdfspath)){
        deleteFileFromHDFS(hdfspath);
        System.out.println("该路径存在,删除路径"+hdfspath);
    }else{
        if(!ifex(hdfspath1)){
            mkdir(hdfspath1);
            System.out.println("创建文件夹"+hdfspath1);
        }
        touchz(hdfspath);
        System.out.println("创建路径"+hdfspath);
    }
    }
    
    
    public static void main5() throws IOException{
        
        String hdfspath = "/user/hadoop";
        System.out.println("所有文件信息如下");
        lsDir(hdfspath);
    }
    
    public static void main4() throws IOException{
        String hdfspath = "/user/hadoop/text2";
        System.out.println("文件信息如下");
        ls(hdfspath);
    }
    
    
    public static void main3() throws IOException{
        String hdfspath = "/user/hadoop/text2";
        cat(hdfspath);
        System.out.println("读取完成");
    }
    
    public static void main2() throws IOException{
        String localpath = "/home/hadoop/1234.txt";
        String hdfspath = "/user/hadoop/text2";
        download(hdfspath,localpath);
        System.out.println("文件下载成功");
    }
    
    
    public static void main1() throws IOException{
        String localpath = "/home/hadoop/123.txt";
        String hdfspath = "/user/hadoop/text2";
        if(ifex(hdfspath)){
            System.out.println("文件存在,请选择追加(1)还是覆盖(2)");
            int i = sc.nextInt();
            if(i==1){
                appendFileToHDFS(hdfspath,localpath);
                System.out.println("文件追加成功");
            }else if(i==2){
                deleteFileFromHDFS(hdfspath);
                update(localpath,hdfspath);
                System.out.println("文件覆盖成功");
            }else{
                System.out.println("输入有误");
            }
                
        }else{
            update(localpath,hdfspath);
            System.out.println("文件不存在,上传成功");
        }
    }
    
    
    public static void update(String localpath , String hdfspath) throws IOException{
        
        InputStream in = new BufferedInputStream(new FileInputStream(localpath));
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        OutputStream out = fileSystem.create(new Path(hdfspath));

        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
        
    }
    
    //判断hdfs中文件是否存在
    public static boolean  ifex(String hdfspath) throws IOException{
         
        Configuration conf = new Configuration(); 
        conf.set("fs.defaultFS","hdfs://localhost:9000"); 
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); 
        FileSystem fs = FileSystem.get(conf); 
        if (fs.exists(new Path(hdfspath))){ 
            fs.close();
            return true;
            }else{ 
                fs.close();
                return false;
            } 
    }
    
    //创建目录
    public static void mkdir(String hdfspath) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        Path dirPath = new Path(hdfspath);
        fileSystem.mkdirs(dirPath);
        fileSystem.close();
    }
    
    //创建文件
public static void touchz(String hdfspath) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        Path dirPath = new Path(hdfspath);
        FSDataOutputStream outputStream = fileSystem.create(dirPath);
        outputStream.close();
        fileSystem.close();
    }
    
    
    
    public static void appendFileToHDFS(String hdfsPath, String localFilePath) throws IOException {
        Configuration config = new Configuration();
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");

        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config);

        InputStream in = new BufferedInputStream(new FileInputStream(localFilePath));
        FSDataOutputStream out = fileSystem.append(new Path(hdfsPath));

        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }
    
    //删除文件
    public static boolean deleteFileFromHDFS(String hdfsPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        boolean result = fileSystem.deleteOnExit(new Path(hdfsPath));
        fileSystem.close();
        return result;
    }
    
    //删除目录
    public static void rmDir(String hdfspath) throws IOException{
    
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        fileSystem.delete(new Path(hdfspath),true);
        fileSystem.close();
    }
    
    //判断目录是否为空
    public static boolean isempty(String hdfspath) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(new Path(hdfspath), true);
        return !remoteIterator.hasNext();
    }
    
    
    public static void download (String hdfsPath, String localPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());

        FSDataInputStream in = fileSystem.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);

        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }
    
    //根据hdfs路径输出其内容到终端
    public static void cat(String hdfspath) throws IOException{
        
           FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());

            FSDataInputStream in = fileSystem.open(new Path(hdfspath));
        
            BufferedReader d = new BufferedReader(new InputStreamReader(in));
            String line = null;
            while((line = d.readLine())!=null){
                System.out.println(line);
            }
            d.close();
            in.close();
    }
    
    //显示hdfs中指定文件的信息
    public static void ls(String hdfspath) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        Path path = new Path(hdfspath);
        FileStatus[] fileStatus = fileSystem.listStatus(path);
        for (FileStatus s : fileStatus){
            System.out.println("路径:"+s.getPath().toString());
            System.out.println("权限:"+s.getPermission().toString());
            System.out.println("大小:"+s.getLen());
            Long time = s.getModificationTime();
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String date = format.format(time);
            System.out.println("时间:"+date);
        }
    }
    
    
    //显示文件夹下所有文件的信息
    public static void lsDir(String hdfspath) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration());
        Path dirPath = new Path(hdfspath);
        RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(dirPath, true);
        while(remoteIterator.hasNext()){
            
            FileStatus s = remoteIterator.next();
            System.out.println("路径:"+s.getPath().toString());
            System.out.println("权限:"+s.getPermission().toString());
            System.out.println("大小:"+s.getLen());
            Long time = s.getModificationTime();
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String date = format.format(time);
            System.out.println("时间:"+date);
            System.out.println();
        }
        fileSystem.close();
    }
    
    //移动文件
    public static boolean mv(String path1,String path2) throws IOException{
        
        FileSystem fileSystem = FileSystem.get(URI.create(path1), new Configuration());
        
        Path p1 = new Path(path1);
        Path p2 = new Path(path2);
        boolean result = fileSystem.rename(p1, p2);
        fileSystem.close();
        return result;
    }
    
}
原文地址:https://www.cnblogs.com/123456www/p/11525298.html