Spark调用Linux命令实现解压和压缩功能

一.应用场景

  在Spark程序中调用Linux命令,实现一些程序难以实现的功能,例如:发送模拟邮件、文件打包或解压等等

二.代码实现

 1 package big.data.analyse.linux
 2 
 3 import org.apache.log4j.{Level, Logger}
 4 import org.apache.spark.sql.SparkSession
 5 
 6 import scala.sys.process._
 7 /**
 8   * Created by zhen on 2019/10/10.
 9   */
10 object SparkUseLinux {
11   /**
12     * 设置日志级别
13     */
14   Logger.getLogger("org").setLevel(Level.WARN)
15   def main(args: Array[String]) {
16     /**
17       * 调用linux命令解压zip包
18       */
19     println("===开始解压包数据===")
20     val path = "D:\testData.zip"
21     val ml = "unzip " + path + " -d D:\"
22     s"$ml".!
23     /**
24       * 创建入口
25       */
26     val fileUrl = "D:\testData.txt"
27     val spark = SparkSession.builder().appName("SparkUseLinux").master("local[2]").getOrCreate()
28     /**
29       * 加载解压后的数据,计算wordcount
30       */
31     val rdd = spark.sparkContext.textFile(fileUrl)
32       .map(row => row.replace("(", " ").replace(")", " ").replace(".", " ").replace(""", " ").replace(":", " "))//去除文字中的,防止出现歧义
33       .flatMap(row => row.split(" "))//把字符串转换为字符集合
34       .map(row => (row, 1))//把每个字符串转换为map,便于计数
35       .reduceByKey(_+_)//计数
36       .filter(row => !row._1.isEmpty)
37       .filter(row => row._2 > 1)
38 
39     println("---结果---")
40     rdd.foreach(println)
41     /**
42       * 关闭入口
43       */
44     spark.stop()
45   }
46 }

三.结果

  执行前:

  

  执行后:

  

   结果:

    

      

原文地址:https://www.cnblogs.com/yszd/p/11649342.html