Hadoop学习笔记之二 文件操作

HDFS分布式文件系统:
优点:支持超大文件存储、流式访问、一次写入多次读取。
缺点:不适应大量小文件、不适应低时延的数据访问、不适应多用户访问任意修改文件。

1.hadoop用于大数据处理,在数据量较小时,并不适用于实时性强的任务,并不是所有的job放到hadoop上,性能都会提升。

2.大量小文件的情况下会极大的降低系统的性能,所以处理前需要先将少文件聚合成大文件,map的输出也应该首先combine在传输给reduce。

3.数据传输时的IO开销,存储在内存中还是硬盘中,节点之间共享数据的分发,都是实践中需要考虑的问题。

涉及的节点:Namenode、  secondaryNameNode、 DataNode

HDFS将大文件切分成小文件块(64M)分布式存储在DataNode节点上,Namenode上仅仅记录元数据的信息。

文件的常用操作:

创建、删除、读、写、追加、重命名、查看属性、更改属性、创建文件夹、移动文件、遍历所有文件、删除文件夹、重命名、判断是否是目录、判断是否存在。

1.shell下提供的文件操作:

 ./hadoop fs -help

1.1 列出文件

./hadoop fs -list 文件路径

1.2 上传文件

./hadoop fs -put <srcpath> <destpath>

1.3 下载文件

./hadoo fs -get <srcpath> <destpath>

1.4 删除文件或文件夹

./hadoop fs -rmr 文件路径

1.5 查看文本文件

./hadoop fs -cat 文件路径

1.6 文件存储负载均衡

./start-balanser.sh

1.7 查看文件系统的统计信息

./hadoop dfsadmin -report

2.代码中的文件操作(API):

一般是使用java创建本地的文件流, 使用fs创建hdfs上的文件流, 然后使用相关的Utils在两者之间进行传递。与外界有交互的时候一定要首先创建流。

hdfs 文件相关的操作都是通过FileSystem进行的, 如文件的(create、open、delete、copyFromLocalFile、exists、listStatus、rename、getFileBlockLocations、

getDatanodeStatus

fs是HDFS文件系统对外的接口,同样的还有LocalFileSystem、DistributedFileSystem

创建fs的时候URI.create()这句话要加上,不然会说Wrong FS:  expected file:\

2.1 创建(写):

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class FileCopy {

    public static void main(String[] args)throws Exception{
        if (args.length != 2){
            System.err.println("Usage : filecopy <source> <dest>");
            System.exit(2);
        }
        Configuration conf = new Configuration();
        InputStream in = new BufferedInputStream(new FileInputStream(args[0]));
        FileSystem fs = FileSystem.get(URI.create(args[1]), conf);
        OutputStream out = fs.create(new Path(args[1]));
        IOUtils.copyBytes(in, out, 512,true);
        
    }
}

2.2 打开(读):

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
        InputStream in = null;
        try{
            in = fs.open(new Path(args[0]));
            IOUtils.copyBytes(in, System.out, 4096,false);
        }finally{
            IOUtils.closeStream(in);
        }

2.3 将数组里的内容写入文件

Configruation conf = new Configuration();
FileSystem fs  = FileSystem.get(URI.create(args[0]),conf);
Path path = new Path("文件路径");
byte[] buf = "hello world!".getBytes();
FSDataOutputStream out = fs.create(path);
out.write(buf, 0, buf,length);

2.4  直接将本地文件上传

Configuration conf = new Configuration();
FileSystem fs  = FileSystem.get(URI.create("hdfs文件路径"),conf);
Path src = new Path("本地文件路径");
Path dest = new Path("hdfs文件路径");
fs.copyFromLocalFiles(src, dest);

2.5 删除:

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(args[0]),conf);
        fs.delete(new Path(args[0]), false);

2.6 判断文件是否存在

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("你的文件路径");
boolean isExists = fs.exists(path);

2.7 重命名

Configuration conf = new Configuration();
FileSystem fs  = FileSystem.get(conf);
Path src = new Path("路径/源文件名");
Path dst = new Path("路径/目的文件名");
boolean isRename = fs.rename(src, dst);

2.8 显示属性:

        Configuration conf  = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(args[0]), conf);
        FileStatus stat = fs.getFileStatus(new Path(args[0]));
        
        System.out.println("路径:	"+stat.getPath());
        System.out.println("长度:	"+stat.getLen());
        System.out.println("用户:	"+stat.getOwner());
        System.out.println("组:	"+stat.getGroup());
        System.out.println("块大小:	"+stat.getBlockSize());
        System.out.println("修改时间:	"+stat.getModificationTime());
        System.out.println("权限:	"+stat.getPermission().toString());
        System.out.println("备份:	"+stat.getReplication());
        System.out.println("文件夹?:	"+stat.isDir());        

2.9 遍历文件:

     Configuration conf = new Configuration();
        FileSystem fs  = FileSystem.get(URI.create(args[0]), conf);
        FileStatus[] status = fs.listStatus(new Path(args[0]));
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths){
            System.out.println(p);
        }

2.10查找文件在hdfs集群中的位置

package cc.test.fileLocation;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class FileLocation {

    public static void main(String[] args)throws Exception{
        Configuration conf = new Configuration();
        String s = "hdfs://MASTERPC:9000/home/Fea.txt";
        FileSystem fs  = FileSystem.get(URI.create(s),conf);
        FileStatus stat = fs.getFileStatus(new Path(s));
        
        BlockLocation[] blkLocations = fs.getFileBlockLocations(stat, 0, stat.getLen());
        
        for (int i=0; i<blkLocations.length; i++){
            System.out.println("block:"+String.valueOf(i)+blkLocations[i].getHosts();
        }
    }
}

2.11 查找所有的datanode节点:

package cc.test.datenodeName;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

public class DatanodeName {
    
    public static void main(String[] args)throws Exception{
        Configuration conf  = new  Configuration();
        FileSystem fs = FileSystem.get(/*URI.create("hdfs://MASTERPC:9000/"),*/conf);
        DistributedFileSystem dfs = (DistributedFileSystem)fs;
        DatanodeInfo[] dnstats = dfs.getDataNodeStats();
        for (int i=0; i<dnstats.length; i++){
            System.out.println(dnstats[i].getHostName());
        }
    }
}

如果在建立fs的时候没有加URI.create(), 会报一下错误:

Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.hadoop.hdfs.DistributedFileSystem
这时由于在eclipse中运行的hadoop程序,默认并不是放到云端执行的,而是eclipse的虚拟云中运行,没有job的调度,也没还有配置文件的读取

路径中如果不加前缀hdfs://MASTERPC:9000则默认是PC的本地文件。(我的猜测)

 采用writer将小文件数据聚合成sequence 字节流从而在hdfs中形成大文件。

package cc.test.filesequence;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;

public class FileSequence {

    private static String[] temValue = {
        "hello world", 
        "hello china",
        "hello home",
        "hello hadoop"
    };
    public static void main(String[] args)throws Exception{
        if (args.length != 2){
            System.err.println("Usage: fileSeq: <src> <dest>");
            System.exit(2);
        }
        Configuration conf  = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(args[1]), conf);
        Path path = new Path(args[1]);
        Text key = new Text();
        BytesWritable value = new BytesWritable();
    
        SequenceFile.Writer writer = null;
        try{
            writer = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass(), CompressionType.BLOCK);
            
            for (int i = 0; i< 5000; i++){
                key.set(String.valueOf(i));
                value.set(new BytesWritable(temValue[i%(temValue.length)].getBytes()));
                writer.append(key, value);
            }
        }finally{
                IOUtils.closeStream(writer);
        }
        System.out.println("done");
    }
}
package cc.test.serializeTest;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;

public class SerializeTest {
    public static byte[] serialize(Writable w)throws IOException{
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DataOutputStream dataout = new DataOutputStream(out);
        
        w.write(dataout);
        dataout.close();
        return out.toByteArray();
    }
    public static byte[] deserialize(Writable w, byte[] bytes)throws IOException{
        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
        DataInputStream datain = new DataInputStream(in);
        w.readFields(datain);
        datain.close();
        return bytes;
    }
    public static void main(String[] args)throws Exception{
        IntWritable intw = new IntWritable(9);
        byte[] bytes = serialize(intw);
        String bytes_str = StringUtils.byteToHexString(bytes);
        System.out.println(bytes_str);
        
        IntWritable intw2 = new IntWritable();
        deserialize(intw2, bytes);
        System.out.println(intw2);
        
    }
}
00000009
9
原文地址:https://www.cnblogs.com/sunniflyer/p/4085054.html