Hadoop的I/O操作

HDFS的数据完整性

检验数据是否损坏最常见的措施是:在数据第一次引入系统时计算校验和并在数据通过一个不可靠通道进行传输时再次计算校验和,这样就能发现数据是否被损坏。HDFS会对写入的所有数据计算校验和,并在读取数据时验证校验和。

  1. 客户端写数据:正在写数据的客户端将数据及其校验和发送到由一系列datanode组成的管线,管线中最后一个datanode负责验证校验和。如果检测到错误,客户端会收到一个ChecksumException异常。(datanode负责在收到数据后存储该数据并验证校验和,它在收到客户端的数据或者复制其他datanode的数据时执行这个操作)
  2. 客户端从datanode读数据:验证校验和,将它们与datanode中存储的校验和进行比较。每个datanode中持久保存有一个用于验证校验和的校验日志,它知道每个数据块的最后一次验证时间,客户端成功验证一个数据块后,会告诉datanode,由此更新日志。
  3. 每个datanode也会在后台中运行一个DataBlockScanner,从而定期验证存储在这个datanode上的所有数据块。
  4. 若datanode读取数据时发现数据损坏,首先向namenode报告,再抛出ChecksumException异常,namenode将这个数据块标记为损坏,之后它安排这个数据块的一个复本复制到另一个datanode,如此一来,数据的复本因子回到期望水平,已损坏的复本会被删除。

压缩

压缩的好处:减少存储文件所需要的磁盘空间;加速数据在网络和磁盘上的传输。

mapreduce中为什么不使用gzip格式

由于map以文件的每个切分作为输入,而gzip格式使用DEFLATE算法来存储压缩后的数据,而DEFLATE算法将数据存储在一系列连续的数据块中,需要从每个块的起始位置进行读取,因此不支持切分,而mapreduce需要从数据列的任意位置开始读取,在这种情况下,mapreduce不会尝试切分gzip压缩文件,会将所有数据块全部输入给一个map节点,而大多数数据块并没有存储在执行该map任务的节点,牺牲了数据的本地性。

如何选择压缩格式

按效率从高到低排序:

  1. 使用容器文件格式,如顺序文件、Avro数据文件,它们同时支持压缩和切分。
  2. 使用支持切分的压缩格式,如bzip2(慢)
  3. 将文件切分成块(合理选择块的大小,确保压缩后接近HDFS块的大小),然后用任意方式压缩。
  4. 存储未经压缩的文件
在mapreduce中使用压缩
  1. 对最终输出进行压缩:
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
  2. 对map任务输出进行压缩:
    // vv MaxTemperatureWithMapOutputCompression
    Configuration conf = new Configuration();
    conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
    conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,
    CompressionCodec.class);
    Job job = new Job(conf);
    // ^^ MaxTemperatureWithMapOutputCompression

序列化

序列化在分布式数据处理的两大领域经常出现:进程间通信和永久存储。
通常情况下,序列化格式需要满足:

  1. 紧凑(紧凑格式能充分利用网络带宽)
  2. 快速(分布式系统需要尽量减少序列化和反序列化的性能开销)
  3. 可扩展(能够满足协议变化的需求,在控制客户端和服务器的过程中,可以直接引进相应的协议)
  4. 支持互操作(可以支持不同语言写的客户端和服务器交互)
hadoop自己的序列化格式-----Writable
  1. 定制Writable集合
    public class TextPair implements WritableComparable {

    private Text first;
    private Text second;

    public TextPair() {
    set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
    set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
    set(first, second);
    }

    public void set(Text first, Text second) {
    this.first = first;
    this.second = second;
    }

    public Text getFirst() {
    return first;
    }

    public Text getSecond() {
    return second;
    }

    public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
    }

    public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
    }

    @Override
    public int hashCode() {
    return first.hashCode() * 163 + second.hashCode();
    }

    @Override
    public boolean equals(Object o) {
    if (o instanceof TextPair) {
    TextPair tp = (TextPair) o;
    return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
    }

    @Override
    public String toString() {
    return first + " " + second;
    }

    public int compareTo(TextPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
    return cmp;
    }
    return second.compareTo(tp.second);
    }
    }

  2. 定制的comparator-----实现直接比较数据流中的记录
    public static class Comparator extends WritableComparator {

    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

    public Comparator() {
    super(TextPair.class);
    }

    @Override
    public int compare(byte[] b1, int s1, int l1,
    byte[] b2, int s2, int l2) {

    try {
    int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
    int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
    int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
    if (cmp != 0) {
    return cmp;
    }
    return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,
    b2, s2 + firstL2, l2 - firstL2);
    } catch (IOException e) {
    throw new IllegalArgumentException(e);
    }
    }
    }

// vv TextPairFirstComparator

public static class FirstComparator extends WritableComparator {

private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();

public FirstComparator() {
  super(TextPair.class);
}

@Override
public int compare(byte[] b1, int s1, int l1,
                   byte[] b2, int s2, int l2) {
  
  try {
    int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
    int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);
    return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);
  } catch (IOException e) {
    throw new IllegalArgumentException(e);
  }
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
  if (a instanceof TextPair && b instanceof TextPair) {
    return ((TextPair) a).first.compareTo(((TextPair) b).first);
  }
  return super.compare(a, b);
}
}

// ^^ TextPairFirstComparator

Avro
  • Avro是个支持多语言的数据序列化框架,支持c,c++,c#,Python,java,php,ruby,java。
    他的诞生主要是为了弥补Writable只支持java语言的缺陷。

  • 很多人会问类似的框架还有Thrift和Protocol,那为什么不使用这些框架,而要重新建一个框架呢,
    或者说Avro有哪些不同。首先,Avro和其他框架一样,数据是用与语言无关的schema描述的,不
    同的是Avro的代码生成是可选的,schema和数据存放在一起,而schema使得整个数据的处理过
    程并不生成代码、静态数据类型等,为了实现这些,需要假设读取数据的时候模式是已知的,这样
    就会产生紧耦合的编码,不再需要用户指定字段标识。

  • Avro的schema是JSON格式的,而编码后的数据是二进制格式(当然还有其他可选项)的,这样对
    于已经拥有JSON库的语言可以容易实现。

  • Avro还支持扩展,写的schema和读的schema不一定要是同一个,也就是说兼容新旧schema和新旧
    客户端的读取,比如新的schema增加了一个字段,新旧客户端都能读旧的数据,新客户端按新的sch
    ema去写数据,当旧的客户端读到新的数据时可以忽略新增的字段。

  • Avro还支持datafile文件,schema写在文件开头的元数据描述符里,Avro datafile支持压缩和分割,这
    就意味着可以做Mapreduce的输入。

原文地址:https://www.cnblogs.com/LeonNew/p/5593084.html