[Learning MapReduce] Working with Sequence File

 1. 什么是Sequence File呢?

官方解释如下:

A Sequence File is a file containing a sequence of binary Key/Value records, where both Key and Value are serialized objects. These objects can be anything extending Hadoop’s Writable interface. So they can be simple objects like strings or integers, but they can also be a custom-made object with many attributes.

我的理解是SequenceFile就是key/value的形式保存的文件格式,这里的key和value都必须继承自Hadoop的Writable接口,并且是可序列化的,也就是说key和value保存成Sequence File之后都是二进制,不可读。Sequence File是Hadoop原生的文件格式。

2. Sequence File的优劣势是什么?

  • 优点
    • 因为是Hadoop原生的,所以不需要安装其他组件或者引用其他jar(例如Avro数据格式就要引入Avro的jar)
    • 写的性能比其他的数据格式都要好
    • 当读取整一行的数据(full row)时,读的性能也较好。
  • 劣势
    • 只能用java来读写,不兼容其他语言(Scala应该也可以,毕竟Scala可以引用java类)
    • no schema evolution:如果数据的结构发生了变化,例如你想删除掉一个属性,不能自动detect,要重新写一个object类。(但是avro是有这样的功能的)

但是一般来讲我们是不会用Sequence File的,我觉得很大一部分的原因是因为它是Hadoop原生的,跟其他平台不兼容。

为啥我们要学习Sequence File呢?因为通过读写Sequence File,我们可以学习到Hadoop IO的相关知识,可以自定义一个Hadoop里的Object。

3.Read and write Sequence File

我用的数据集是http://data.donorschoose.org/open-data/overview/里提供的donation表。

首先我们要定义一个Object,这个Object要继承自WritableComparable.

package com.qingfei.donation;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.yarn.util.StringHelper;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by ASUS on 2/21/2018.
 */
public class DonationWritable implements WritableComparable<DonationWritable> {
    public String donationId;
    public String projectId;
    public String cartId;
    public String donorCity;
    public String donorState;
    public String donorZip;
    public String donateByTeacher;
    public String donateTimestamp;
    public float dollarAmount;
    public float support;
    public float total;
    public String paymentMethod;
    public String paymentIncAcctCredit;
    public String paymentIncCampaignGiftCard;
    public String paymentIncWebGiftCard;
    public String paymentWasPromoMatched;
    public String isTeacherReferred;
    public String givingPageId;
    public String givingPageType;
    public String forHonoree;
    public String thankYouPacketMailed;

    public int compareTo(DonationWritable o) {
        return this.donationId.compareTo(o.donationId);
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(donationId);
        dataOutput.writeUTF(projectId);
        //dataOutput.writeUTF(cartId);
        dataOutput.writeUTF(donorCity);
        dataOutput.writeUTF(donorState);
        //dataOutput.writeUTF(donorZip);
        dataOutput.writeUTF(donateByTeacher);
        dataOutput.writeUTF(donateTimestamp);
        dataOutput.writeFloat(dollarAmount);
        dataOutput.writeFloat(support);
        dataOutput.writeFloat(total);
        /*dataOutput.writeUTF(paymentMethod);
        dataOutput.writeUTF(paymentIncAcctCredit);
        dataOutput.writeUTF(paymentIncCampaignGiftCard);
        dataOutput.writeUTF(paymentIncWebGiftCard);
        dataOutput.writeUTF(paymentWasPromoMatched);
        dataOutput.writeUTF(isTeacherReferred);
        dataOutput.writeUTF(givingPageId);
        dataOutput.writeUTF(givingPageType);
        dataOutput.writeUTF(forHonoree);
        dataOutput.writeUTF(thankYouPacketMailed);*/
    }

    public void readFields(DataInput dataInput) throws IOException {
        donationId=dataInput.readUTF();
        projectId=dataInput.readUTF();
        //cartId=dataInput.readUTF();
        donorCity=dataInput.readUTF();
        donorState=dataInput.readUTF();
        //donorZip=dataInput.readUTF();
        donateByTeacher=dataInput.readUTF();
        donateTimestamp=dataInput.readUTF();
        dollarAmount=dataInput.readFloat();
        support=dataInput.readFloat();
        total = dataInput.readFloat();
        /*paymentMethod=dataInput.readUTF();
        paymentIncAcctCredit=dataInput.readUTF();
        paymentIncCampaignGiftCard=dataInput.readUTF();
        paymentIncWebGiftCard=dataInput.readUTF();
        paymentWasPromoMatched = dataInput.readUTF();
        isTeacherReferred=dataInput.readUTF();
        givingPageId=dataInput.readUTF();
        givingPageType=dataInput.readUTF();
        forHonoree=dataInput.readUTF();
        thankYouPacketMailed=dataInput.readUTF();
        givingPageType=dataInput.readUTF();*/

    }

    public void parseLine(String line) throws IOException {
        line = line.replaceAll(""","");
        System.out.println(line);
        String[] parts = line.split(",",-1);
        donationId = parts[0];
        projectId = parts[1];
        donorCity = parts[4];
        donorState = parts[5];
        donateByTeacher = parts[7];
        donateTimestamp = parts[8];
        dollarAmount = Float.parseFloat(parts[9]);
        support = Float.parseFloat(parts[10]);
        total = Float.parseFloat(parts[11]);

    }

    @Override
    public String toString() {
        return this.donationId+",["+this.projectId+","+this.donorCity+","+this.donorState+","+this.donateByTeacher+","+this.donateTimestamp+","+this.dollarAmount+","+this.support+","+this.total+"]";
    }
}

DonationWritable

负责解析csv中的数据,将csv转化为sequence file

package com.qingfei.donation;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.SnappyCodec;

import java.io.*;

/**
 * Created by ASUS on 2/21/2018.
 */
public class DonationWriter {
    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //inputFile是本地文件系统上的文件
        File inputFile = new File(args[0]);
        //outputPath是HDFS上的文件
        Path outputPath = new Path(args[1]);

        int processed = 0;
        int errors = 0;
        try {
            BufferedReader br = new BufferedReader(new FileReader(inputFile));
            SequenceFile.Writer writer = SequenceFile.createWriter(new Configuration(),
                    SequenceFile.Writer.file(outputPath),
                    SequenceFile.Writer.keyClass(Text.class),
                    SequenceFile.Writer.valueClass(DonationWritable.class),
                    SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE, new DefaultCodec())
            );
            for (String line = br.readLine();line != null;line = br.readLine()) {
                /*if (processed == 0) {
                    processed++;
                    continue;
                }*/
                try {
                DonationWritable donation = new DonationWritable();
                donation.parseLine(line);
                System.out.println(donation);
                Text key = new Text(donation.donationId);
                writer.append(key,donation);
                    writer.hflush();
                } catch (Exception e) {
                    e.printStackTrace();
                    errors++;
                }
                processed++;
                if (processed%1000 ==0) {
                    System.out.println(String.format("%d thousand lines processed", processed / 1000));
                }

            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("Number of lines processed:"+processed);
            System.out.println("Number of errors"+errors);
            System.out.printf("took %d ms.
", System.currentTimeMillis()-start);
        }
    }
}

DonationReader
负责读取SequenceFile的内容

package com.qingfei.donation;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

/**
 * Created by ASUS on 2/22/2018.
 */
public class DonationReader {
    public static void main(String[] args) {
        Path filePath = new Path(args[0]);
        System.out.println(filePath.getName());
        try {
            SequenceFile.Reader reader = new SequenceFile.Reader(new Configuration(),SequenceFile.Reader.file(filePath));
            System.out.println("Compressed?"+reader.isBlockCompressed());
            Text key = new Text();
            DonationWritable value = new DonationWritable();
            System.out.println(reader.getKeyClassName());
            System.out.println(reader.getValueClassName());
            while(reader.next(key,value)) {
                System.out.println(value);

            }
        } catch (Exception e) {

        }
    }
}

下面就可以开始运行了

donation.csv就是我使用的数据,因为我的虚拟机磁盘空间不是很大,所以我只取了30条记录。

执行DonationWriter,将CSV转化成Sequence File

执行DonationReader,读取Sequence File

此时我们就可以读写Sequence File了,并且可以开始写MR程序了!

原文地址:https://www.cnblogs.com/qingfei1994/p/8506359.html