spark读取压缩文件

spark读取压缩文件,对同一个压缩文件内文件进行分布式处理,粒度:文件级

-| .rar.gz

   -| .gz

   -| .zip

-| .zip  

   -| .gz

   -| .zip

使用 sc.binaryFile()得到-> JavaPairRDD<String,PortableDataStream>

key是压缩文件根目录,PortableDataStream是根目录的二进制流。

并行化处理:将每个压缩文件根据内部文件拆分成文件流,实现1:n的并行度

 1 // 一个压缩包流,对应多个流,每个流对应一个文件名称
 2     public static JavaPairRDD<PortableDataStream, FilePropertyPojo> getFileListRdd(
 3             JavaPairRDD<String, PortableDataStream> zipRdd) {
 4         return zipRdd.flatMapToPair(tuple2 -> {
 5             List<Tuple2<PortableDataStream, FilePropertyPojo>> targetList = new ArrayList<>();
 6             List<FilePropertyPojo> fileNameList = getFileNameList(tuple2._2);
 7 
 8             for (FilePropertyPojo filePropertyPojo : fileNameList) {
 9 
10                 targetList.add(new Tuple2<>(tuple2._2, filePropertyPojo));
11             }
12             return targetList.iterator();
13 
14         });
15 
16     }
17 
18     private static List<FilePropertyPojo> getFileNameList(PortableDataStream portableDataStream) {
19         List<FilePropertyPojo> fileNameList = new ArrayList<>();
20         try {
21             List<FilePropertyPojo> mrPropertyPojoList = new ArrayList<>();
22             String path = portableDataStream.getPath();
23 
24             String fileCompressMode = path.substring(path.lastIndexOf('.')).toLowerCase();
25             switch (fileCompressMode) {
26             case ".gz":
27                 getFileNameFromGz(portableDataStream, mrPropertyPojoList);
28                 break;
29             case ".zip":
30                 getFileNameFromZip(portableDataStream, mrPropertyPojoList);
31                 break;
32 
33             default:
34             }
35             return mrPropertyPojoList;
36 
37         } catch (Exception e) {
38             //
39         }
40         return fileNameList;
41     }
42 
43     private static void getFileNameFromGz(PortableDataStream portableDataStream,
44             List<FilePropertyPojo> mrPropertyPojoList) {
45         try (TarArchiveInputStream inputStream = new TarArchiveInputStream(
46                 new GZIPInputStream(portableDataStream.open()))) {
47             TarArchiveEntry tarArchiveEntry;
48             while ((tarArchiveEntry = inputStream.getNextTarEntry()) != null) {
49                 try {
50                     getEachFileName(mrPropertyPojoList, tarArchiveEntry.getName(), tarArchiveEntry.getSize());
51 
52                 } catch (Exception e) {
53                     //
54                 }
55             }
56         } catch (Exception e) {
57             //
58         }
59     }
60 
61     private static void getFileNameFromZip(PortableDataStream portableDataStream,
62             List<FilePropertyPojo> mrPropertyPojoList) throws IOException {
63 
64         try (ZipArchiveInputStream zipArchiveInputStream = new ZipArchiveInputStream(portableDataStream.open())) {
65             ZipArchiveEntry nextZipEntry;
66             while ((nextZipEntry = zipArchiveInputStream.getNextZipEntry()) != null) {
67                 try {
68                     getEachFileName(mrPropertyPojoList, nextZipEntry.getName(), nextZipEntry.getSize());
69 
70                 } catch (Exception e) {
71                     //
72                 }
73             }
74         }
75     }
View Code
原文地址:https://www.cnblogs.com/carsonwuu/p/14792548.html