spark HelloWorld程序(scala版)

使用本地模式,不需要安装spark,引入相关JAR包即可:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency> 

创建spark:

        val sparkUrl = "local"
        val conf = new SparkConf()
                //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar"))
                .set("fs.hdfs.impl.disable.cache", "true")
                .set("spark.executor.memory", "8g")

        val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config(conf)
                .config("spark.some.config.option", "some-value")
                .master(sparkUrl)
                .getOrCreate()    

加载本地文件:

val parquetFileDF = spark.read.parquet("/home/panteng/下载/000001_0")
            //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")

文件操作:

parquetFileDF.createOrReplaceTempView("parquetFile")

val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000")
val diffDesc = descDF.distinct().sort("description")
diffDesc.createOrReplaceTempView("pre_desc")
val zhaoshang = spark.sql("select * from pre_desc")
zhaoshang.printSchema()

遍历处理:

zhaoshang.foreach(row => clustering(row))
val regexRdd = spark.sparkContext.parallelize(regexList)
regexRdd.repartition(1).saveAsTextFile("/home/panteng/下载/temp6")

spark.stop()

附其他函数:

def clustering(row: Row): String = {
        try {
            var tempRegex = new Regex("null")
            if (textPre.equals(row.getAs[String]("pre"))) {
                textList = row.getAs[String]("description").replaceAll("\d","0") :: textList
                return "continue"
            } else {
                if (textList.size > 2) {
                    tempRegex = ScalaClient.getRegex(textList)
                    regexList = tempRegex :: regexList
                }
                if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) {
                    textPre = row.getAs[String]("pre")
                    textList = textList.dropRight(textList.size)
                    textList = row.getAs[String]("description") :: textList
                }
                return "ok - " + tempRegex.toString()
            }
        } catch {
            case e: Exception => println("kkkkkkk" + e)
        }
        return "error"
    }
package scala.learn

import top.letsgogo.rpc.ThriftProxy

import scala.util.matching.Regex

object ScalaClient {
    def main(args: Array[String]): Unit = {
        val client = ThriftProxy.client
        val seqList = List("您尾号9081的招行账户入账人民币689.00元",
            "您尾号1234的招行一卡通支出人民币11.00元",
            "您尾号2345的招行一卡通支出人民币110.00元",
            "您尾号5432的招行一卡通支出人民币200.00元",
            "您尾号5436的招行一卡通入账人民币142.00元")
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence("您尾号1234的招行一卡通支出人民币200.00元")
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\" + word
                }
                regexSeq.append(word)
            } else {
                regexSeq.append("(.*)")
            }
        }
        println(regexSeq)
        val regex = new Regex(regexSeq.mkString)
        for (seq <- seqList) {
            println(regex.findAllIn(seq).isEmpty)
        }
    }

    def getRegex(seqList: List[String]) = {
        val client = ThriftProxy.client
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence(seqList(0))
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\" + word
                }
                regexSeq.append(word)
            } else {
                if(regexSeq.size > 4) {
                    val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0)
                    if (!"(.*)".equals(endStr)) {
                        regexSeq.append("(.*)")
                    }
                }else{
                    regexSeq.append("(.*)")
                }
            }
        }
        println(regexSeq + "  " + seqList.size)
        val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\d+"))
        //for (seq <- seqList) {
        //    println(regex.findAllIn(seq).isEmpty)
        //}
        regex
    }
}
批量数据提取正则

输出目录覆盖:

spark.hadoop.validateOutputSpecs false

基于dataSet执行Map,必须定义encoder  否则编译异常!但是对于某些type DataTypes没有提供,只能转为rdd进行map,之后再由RDD 转dataframe

val schema = StructType(Seq(
StructField("pre", StringType),
StructField("description", StringType)
))
val encoder = RowEncoder(schema)
val replaceRdd = diffDesc.map(row => myReplace(row))(encoder).sort("description")


任务提交:
./spark-2.2.0-bin-hadoop2.7/bin/spark-submit --name panteng --num-executors 100 --executor-cores 4 ./spark-scala.jar spark://dommain:7077

去除部分日志:
//        Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
// Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//        spark.sparkContext.setLogLevel("WARN")
 
常用配置:

spark-submit --java 8
--cluster xxx --master yarn-cluster
--class xx.xx.xx.xx.Xxx
--queue default
--conf spark.yarn.appMasterEnv.JAVA_HOME=/opt/soft/jdk1.8.0
--conf spark.executorEnv.JAVA_HOME=/opt/soft/jdk1.8.0
--conf spark.yarn.user.classpath.first=true
--num-executors 128
--conf spark.yarn.job.owners=panteng
--conf spark.executor.memory=10G
--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.minExecutors=2
--conf spark.yarn.executor.memoryOverhead=4000
--conf spark.yarn.driver.memoryOverhead=6000
--conf spark.driver.memory=10G
--conf spark.driver.maxResultSize=4G
--conf spark.rpc.message.maxSize=512
--driver-class-path hdfs://c3prc-hadoop/tmp/u_panteng/lda-lib/guava-14.0.1.jar
xx-1.0-SNAPSHOT.jar parm1 parm2 

原文地址:https://www.cnblogs.com/tengpan-cn/p/7497488.html