序列文件的读取和写入

1、序列文件(二进制)的写入

    /**
     * 写入文件
     * @throws IOException
     */
    @Test
    public void save() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class);
        for(int i = 0; i < 100; i++){
            writer.append(new IntWritable(i),new Text("hello" + i));
        }
        writer.close();
    }

读取序列文件
D:sequence>hdfs dfs -text file:///d:/sequence/1.seq
18/01/04 11:15:20 WARN zlib.ZlibFactory: Failed to load/initialize native-zlib library
18/01/04 11:15:20 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
0       hello0
1       hello1
2       hello2
3       hello3
4       hello4
5       hello5
6       hello6
7       hello7

2、序列文件的读取

/**
     * 读取文件
     * @throws IOException
     */
    @Test
    public void read() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Reader reader =  new SequenceFile.Reader(fs,path,conf);
        IntWritable key = new IntWritable();
        Text value = new Text();
        while (reader.next(key,value)){
            System.out.println(key.get() + "===>" + value.toString());
        }
    }
    /**
     * 读取文件2
     * @throws IOException
     */
    @Test
    public void read2() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Reader reader =  new SequenceFile.Reader(fs,path,conf);
        IntWritable key = new IntWritable();
        Text value = new Text();
        while (reader.next(key)){
            reader.getCurrentValue(value);
            System.out.println(key.get() + "===>" + value.toString());
        }
    }

3、读取文件指针

/**
     * 读取文件指针
     * @throws IOException
     */
    @Test
    public void read2() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Reader reader =  new SequenceFile.Reader(fs,path,conf);
        long pos = reader.getPosition();
        System.out.println("文件指针:" + pos);
        IntWritable key = new IntWritable();
        Text value = new Text();
        while (reader.next(key)){
            System.out.println("文件指针:" +  reader.getPosition());
            reader.getCurrentValue(value);
            System.out.println(key.get() + "===>" + value.toString());
        }
    }


4、增加同步点

 /**
     * 写入文件
     * @throws IOException
     */
    @Test
    public void save() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class);
        for(int i = 0; i < 10; i++){
            writer.append(new IntWritable(i),new Text("hello" + i));
            writer.sync();
        }
        for(int i = 0; i < 10; i++){
            writer.append(new IntWritable(i),new Text("hello" + i));
            if(i % 2 ==0) writer.sync();
        }
        writer.close();
    }

文件指针:128
文件指针:155===>0===>hello0
文件指针:202===>1===>hello1
文件指针:249===>2===>hello2
文件指针:296===>3===>hello3
文件指针:343===>4===>hello4
文件指针:390===>5===>hello5
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9

5、操作同步点

 /**
     * 读取文件指针,操纵同步点
     * @throws IOException
     */
    @Test
    public void readsync() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Reader reader =  new SequenceFile.Reader(fs,path,conf);
        long pos = reader.getPosition();
        System.out.println("文件头部指针:" + pos);
        IntWritable key = new IntWritable();
        Text value = new Text();
        //指定读取的开始点
        reader.seek(155);
//        reader.seek(237);
//        reader.sync(209);
        while (reader.next(key)){
            reader.getCurrentValue(value);
            System.out.println("文件指针:" +  reader.getPosition()+"===>"+key.get() + "===>" + value.toString());
        }
    }
文件头部指针:128
文件指针:202===>1===>hello1
文件指针:249===>2===>hello2
文件指针:296===>3===>hello3
文件指针:343===>4===>hello4
文件指针:390===>5===>hello5
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9

6、如果没有对应的指针,使用sync()

/**
     * 读取文件指针,操纵同步点
     * @throws IOException
     */
    @Test
    public void readsync() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Reader reader =  new SequenceFile.Reader(fs,path,conf);
        long pos = reader.getPosition();
        System.out.println("文件头部指针:" + pos);
        IntWritable key = new IntWritable();
        Text value = new Text();
        //指定读取的开始点
//        reader.seek(250);
//        reader.seek(237);
        reader.sync(344);
        while (reader.next(key)){
            reader.getCurrentValue(value);
            System.out.println("文件指针:" +  reader.getPosition()+"===>"+key.get() + "===>" + value.toString());
        }
    }

文件头部指针:128
文件指针:437===>6===>hello6
文件指针:484===>7===>hello7
文件指针:531===>8===>hello8
文件指针:578===>9===>hello9
文件指针:625===>0===>hello0
文件指针:672===>1===>hello1
文件指针:699===>2===>hello2
文件指针:746===>3===>hello3
文件指针:773===>4===>hello4
文件指针:820===>5===>hello5
文件指针:847===>6===>hello6
文件指针:894===>7===>hello7
文件指针:921===>8===>hello8
文件指针:968===>9===>hello9

7、一边写入一边压缩

 /**
     * 写入文件
     * @throws IOException
     */
    @Test
    public void save() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("D:\sequence\1.seq");
        SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, IntWritable.class, Text.class, SequenceFile.CompressionType.RECORD,new GzipCodec());
        for(int i = 0; i < 10; i++){
            writer.append(new IntWritable(i),new Text("hello" + i));
            writer.sync();
        }
        for(int i = 0; i < 10; i++){
            writer.append(new IntWritable(i),new Text("hello" + i));
            if(i % 2 ==0) writer.sync();
        }
        writer.close();
    }









欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
原文地址:https://www.cnblogs.com/flyingcr/p/10326974.html