Hadoop基础-SequenceFile的压缩编解码器

                      Hadoop基础-SequenceFile的压缩编解码器

                                            作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.Hadoop压缩简介

1>.文件压缩的好处

  第一:较少存储文件占用的磁盘空间,这样就加速数据在磁盘中的传输(比如源文件1G,经过压缩后只有10M,那么文件传输起来就相当的快啦!)

  第二:节省网络带宽,很多时候将数据压缩不仅仅是为了存储,还是为了节省网络带块,在传输数据的时候,先将数据进行压缩处理再发送给接收端,接收端接到数据后会进解压操作,从而拿到真正的数据。

2>.Hadoop压缩算法

  包括Deflate,Gzip,Bzip2,Lz4,Snappy等算法,其中Bzip2是极致压缩比例,而Lz4,Lzo和Snappy则是优化压缩速度,在生产环境下根据算法相关特性进行技术选型。

  注意:Lzo(with index)和Bzip2是可切割的算法,适合在MR中使用。

3>.Hadoop压缩算法的对比

 

二.Gzip压缩与解压缩案例展示

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.compress;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.io.IOUtils;
10 import org.apache.hadoop.io.compress.CompressionInputStream;
11 import org.apache.hadoop.io.compress.CompressionOutputStream;
12 import org.apache.hadoop.io.compress.GzipCodec;
13 import org.apache.hadoop.util.ReflectionUtils;
14 
15 import java.io.File;
16 import java.io.FileInputStream;
17 import java.io.FileOutputStream;
18 
19 public class TestCompressCodec {
20     private static final  File srcFile = new File("D:\10.Java\IDE\yhinzhengjieData\CompressCodec\jdk-9.CHM");
21     private static final  File gzipFilePath = new File("D:\10.Java\IDE\yhinzhengjieData\CompressCodec\yinzhengjie.gz");
22     private static final  File gunzipFilePath = new File("D:\10.Java\IDE\yhinzhengjieData\CompressCodec\yinzhengjie.CHM");
23     public static void main(String[] args) throws Exception {
24         GizpCompress();
25         GunizpCompress();
26     }
27 
28     //Gzip进行压缩的方法
29     public static void  GizpCompress() throws Exception {
30         //获取程序开始执行的时间戳
31         long start = System.currentTimeMillis();
32         //实例化一个Configuration,它会自动去加载本地的core-site.xml配置文件的fs.defaultFS属性。(该文件放在项目的resources目录即可。)
33         Configuration conf = new Configuration();
34         //将hdfs写入的路径定义在本地,需要修改默认我文件系统,这样就可以覆盖到之前在core-site.xml配置文件读取到的数据。
35         conf.set("fs.defaultFS","file:///");
36         //通过hadoop提供的反射工具类ReflectionUtils的newInstance方法生成一个GzipCodec方法,第一个参数需要传入一个编解码器类,第二个参数需要传入一个Configuration对象。
37         GzipCodec gzipCodec = ReflectionUtils.newInstance(GzipCodec.class, conf);
38         //通过gzipCodec的createOutputStream方法创建出压缩输出流
39         CompressionOutputStream cos = gzipCodec.createOutputStream(new FileOutputStream(gzipFilePath));
40         //创建出需要压缩的文件
41         FileInputStream fis = new FileInputStream(srcFile);
42         //通过hadoop提供的拷贝类工具进行拷贝数据,第一个参数需要传入一个输入流,第二个参数需要传入一个输出流,第三个参数指定传输的缓冲区大小。
43         IOUtils.copyBytes(fis,cos,1024);
44         //释放资源
45         fis.close();
46         cos.close();
47         //获取程序结束执行的时间戳
48         long end = System.currentTimeMillis();
49         //输出时间压缩的时间
50         System.out.printf("源文件大小是:[%d]字节,压缩时间:[%d],压缩后的文件大小是:[%d]字节.
",srcFile.length(),(end - start),gzipFilePath.length());
51     }
52 
53 
54     //Gzip进行解压的方法
55     public static void  GunizpCompress() throws Exception {
56         //获取程序开始执行的时间戳
57         long start = System.currentTimeMillis();
58         //实例化一个Configuration,它会自动去加载本地的core-site.xml配置文件的fs.defaultFS属性。(该文件放在项目的resources目录即可。)
59         Configuration conf = new Configuration();
60         //将hdfs写入的路径定义在本地,需要修改默认我文件系统,这样就可以覆盖到之前在core-site.xml配置文件读取到的数据。
61         conf.set("fs.defaultFS", "file:///");
62         //通过hadoop提供的反射工具类ReflectionUtils的newInstance方法生成一个GzipCodec方法,第一个参数需要传入一个编解码器类,第二个参数需要传入一个Configuration对象。
63         GzipCodec gzipCodec = ReflectionUtils.newInstance(GzipCodec.class, conf);
64         //通过gzipCodec的createOutputStream方法创建出解压输入流
65         CompressionInputStream cis = gzipCodec.createInputStream(new FileInputStream(gzipFilePath));
66         //创建解压后生成的文件
67         FileOutputStream fos = new FileOutputStream(gunzipFilePath);
68         //通过hadoop提供的工具类拷贝数据
69         IOUtils.copyBytes(cis,fos,1024);
70         //释放资源
71         fos.close();
72         cis.close();
73         //获取程序结束执行的时间戳
74         long end = System.currentTimeMillis();
75         //输出时间压缩的时间
76         System.out.printf("源文件大小是:[%d]字节,压缩时间:[%d],解压后的文件大小是:[%d]字节.
",gzipFilePath.length(),(end - start),gunzipFilePath.length());
77     }
78 }
79 
80 /*
81 以上代码输出结果如下:
82 源文件大小是:[53712527]字节,压缩时间:[2360],压缩后的文件大小是:[53030595]字节.
83 源文件大小是:[53030595]字节,压缩时间:[336],解压后的文件大小是:[53712527]字节.
84  */

  代码执行之前目录文件如下:

  代码执行之后目录文件如下:

三.综合测试Hadoop压缩编解码器(windows环境测试,不包含Snappy压缩

1>.测试代码如下:

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.compress;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.io.IOUtils;
10 import org.apache.hadoop.io.compress.*;
11 import org.apache.hadoop.util.ReflectionUtils;
12 import java.io.File;
13 import java.io.FileInputStream;
14 import java.io.FileOutputStream;
15 
16 
17 public class TestCompressCodec {
18     /**
19      * 设置路径动态传参
20      * @param args
21      */
22     public static void main(String[] args) {
23         if(args == null || args.length == 0){
24             System.out.println("需要输入路径");
25             System.exit(-1);
26         }
27         Class[] classes = {
28                 DefaultCodec.class,
29                 GzipCodec.class,
30                 BZip2Codec.class,
31                 Lz4Codec.class,
32                 LzopCodec.class,
33 //                SnappyCodec.class
34         };
35         for(Class clazz : classes){
36             testCompress(clazz, args[0]);
37             testDecompress(clazz,args[0]);
38         }
39     }
40     /**
41      * Gzip压缩
42      * @throws Exception
43      */
44     public static void testCompress(Class clazz, String path) {
45         try {
46             long start = System.currentTimeMillis();
47             Configuration conf = new Configuration();
48             conf.set("fs.defaultFS", "file:///");
49             CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(clazz, conf);
50             FileInputStream fis = new FileInputStream(path);
51             //获取扩展名
52             String ext = codec.getDefaultExtension();
53             //创建压缩输出流
54             CompressionOutputStream cos = codec.createOutputStream(new FileOutputStream(path+ext));
55             IOUtils.copyBytes(fis,cos,1024);
56             fis.close();
57             cos.close();
58             System.out.print("压缩类型:"+ ext+"	"+ "压缩时间:" + (System.currentTimeMillis() - start)+ "	");
59             File f = new File(path+ext);
60             System.out.print("文件大小:"+ f.length() + "	");
61         } catch (Exception e) {
62             e.printStackTrace();
63         }
64     }
65 
66     /**
67      * Gzip解压
68      * @throws Exception
69      */
70     public static void testDecompress(Class clazz,String path) {
71         try {
72             long start = System.currentTimeMillis();
73             Configuration conf = new Configuration();
74             conf.set("fs.defaultFS", "file:///");
75             CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(clazz, conf);
76             //扩展名
77             String ext = codec.getDefaultExtension();
78             //压缩输入流
79             CompressionInputStream cis = codec.createInputStream(new FileInputStream(path+ext));
80             FileOutputStream fos = new FileOutputStream(path+ext+".txt");
81             IOUtils.copyBytes(cis,fos,1024);
82             cis.close();
83             fos.close();
84             System.out.println("解压时间:" + (System.currentTimeMillis() - start));
85         } catch (Exception e) {
86             e.printStackTrace();
87         }
88     }
89 }

2>.Idea进行模拟命令行传参

  点击编辑配置(Edit Configurations)

  输入需要压缩的文件路径

3>.执行代码产物

  执行代码之前目录文件如下:

  执行代码之后目录文件如下:

4>.比较压缩结果

  执行上述代码结果如下:

 

  根据结果反推理论:(以上实验是我取了一次结果为准,生成环境最好以实际生成环境为准,这个数据仅供参考!)

       压缩时间从小到大:lz4 < lzo < gz < deflate < bz2;

       压缩大小从小到大:defalte < gz < lzo < bz2 < lz4;

       解压时间从小到大:lzo <  lz4 < deflate < gz < bz2; 

  注意,由于我此次测试环境是在windows,上述测试不包含Snappy压缩类型,想要查看Snappy压缩,请参考:https://www.cnblogs.com/yinzhengjie/p/9124038.html

四.压缩格式选型

1>.应该使用哪种压缩格式?(文笔摘自<<Hadoop权威指南第四册>>)

   Hadoop 应用处理的数据集非常大,因此需要借助于压缩。使用哪种压缩格式与待处理的文件的大小,格式和所使用的工具相关。下面有一些建议,大致是按照效率从高到低排列的。

       建议一:使用容器格文件格式,例如顺序文件,Avro数据文件,ORCFiles或者Parquet文件,所有含有这些文件格式同时支持压缩和切分。通常最好与一个快速压缩工具联合使用,例如:LZO,LZ4,或者Snappy。(推荐使用)

    建议二:使用支持切分的压缩格式,例如 bzip2(尽管 bzip2非常慢),或者使用通过索引实现切分的压缩格式,例如:LZO。(推荐使用)

    建议三:在应用中将文件切分成块,并使用任意一种压缩格式为每个数据块建立压缩文件(不论它是否支持切分)。在这种情况下,需要合理选择数据块的大小,以确保压缩后数据块的大小近似于HDFS块的大小。(不推荐)

    建议四:存储未经过压缩的文件。(不推荐)

  对大文件来说,不要使用不支持切分整个文件的压缩格式,因为会失去数据的本地特性,进而造成MapReduce应用效率底下。

2>.LZO通过索引实现切分的压缩格式案例展示

/*
@author :yinzhengjie
Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
EMAIL:y1053419035@qq.com
*/
package cn.org.yinzhengjie.compress;

import com.hadoop.compression.lzo.LzoIndexer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class LzoIndex {
    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //设置编解码器
        conf.set("io.compression.codecs","org.apache.hadoop.io.compress.LzopCodec");
        //创建一个LzoIndexer对象,需要传入conf对象
        LzoIndexer indexer = new LzoIndexer(conf);
        //需要指定协议为本地“file:///”
        indexer.index(new Path("file:///D:\10.Java\IDE\yhinzhengjieData\CompressCodec\jdk-9.CHM.lzo"));
    }
}

/*
以上代码执行成功后,会在D:10.JavaIDEyhinzhengjieDataCompressCodecjdk-9.CHM.lzo同级目录下生成文件:D:10.JavaIDEyhinzhengjieDataCompressCodecjdk-9.CHM.lzo.index文件。
 */
原文地址:https://www.cnblogs.com/yinzhengjie/p/9119720.html