Spark 案例实操

案例实操

  Spark Shell 仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在 IDE
中编制程序,然后打成 jar 包,然后提交到集群,最常用的是创建一个 Maven 项目,利用
Maven 来管理 jar 包的依赖。
 
 

1 编写 WordCount 程序

1)创建一个 Maven 项目 WordCount 并导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.lxl</groupId>
    <artifactId>spark02</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>sparkCore</module>
    </modules>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
2)编写代码
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {

    /*
    1.创建配置信息
     */
    val conf = new SparkConf().setAppName("wc")


    /*
    2.创建sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.处理
     */
    //读取数据
    val lines = sc.textFile(args(0)) //传入路径

    //压平 flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    //输出,展示
    //result.collect()
    
    //保存数据到文件
    result.saveAsTextFile(args(1)) //传入的保存文件的目录


    //关闭连接
    sc.stop()



  }

}
 
 
3)打包插件
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spark01</artifactId>
        <groupId>com.atlxl</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>sparkCore</artifactId>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>
 
 
4)打包到集群测试
 
先将 jar 包拷贝到 spark 的家目录下,改名为wordcount。
[lxl@hadoop102 spark]$ mv sparkCore-1.0-SNAPSHOT.jar wordcount.jar
[lxl@hadoop102 spark]$ bin/spark-submit 
--class com.lxl.WordCount 
--master spark://hadoop102:7077 
--executor-memory 1G 
--total-executor-cores 2 
./wordcount.jar 
hdfs://hadoop102:9000/fruit.tsv 
hdfs://hadoop102:9000/out
 
 
 
 
 
 

2 本地调试

  本地 Spark 程序调试需要使用 local 提交模式,即将本机当做运行环
境,Master 和 Worker 都为本机。运行时直接加断点调试即可。如下:
创建 SparkConf 的时候设置额外属性,表明本地执行:
val conf = new SparkConf().setAppName("WC").setMaster("local[*]")
 
 
完整代码:(只需将 WordCount 第二步的代码稍作修改即可)
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    /*
    1.创建配置信息
     */
    val conf = new SparkConf().setAppName("wc").setMaster("local[*]")


    /*
    2.创建sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.处理
     */
    //读取数据
    val lines = sc.textFile("C:\Users\67001\Desktop\word.txt") //本地路径

    //压平 flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    //输出,展示
//    result.collect()

    //保存数据
//    result.saveAsTextFile(args(1))

    //打印到控制台
    result.foreach(println)


    //关闭连接
    sc.stop()
  }

}

3 远程调试

  通过 IDEA 进行远程调试,主要是将 IDEA 作为 Driver 来提交应用程序,

配置过程如下:

  修改 sparkConf,添加最终运行的 Jar 包、Driver 程序的地址,

并设置 Master 的提交地址:

val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
.setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar"))
 完整代码:
package com.lxl

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    /*
    1.创建配置信息
     */
    val conf = new SparkConf().setAppName("wc").setMaster("spark://hadoop102:7077")
      .setJars(List("D:\Workspace\IDEA_work\Spark_Work\spark02\sparkCore\target\sparkCore-1.0-SNAPSHOT.jar"))


    /*
    2.创建sparkcontext
     */
    val sc = new SparkContext(conf)



    /*
    3.处理
     */
    //读取数据
    val lines = sc.textFile("hdfs://hadoop102:9000/fruit.tsv") //HDFS路径

    //压平 flatMap
    val words = lines.flatMap(_.split(" "))

    //map(word,1)
    val k2v = words.map((_, 1))

    //resuceBykey(word, x)
    val result = k2v.reduceByKey(_ + _)

    //输出,展示
//    result.collect()

    //保存数据
    result.saveAsTextFile("hdfs://hadoop102:9000/out1") //保存到HDFS的路径

    //打印到控制台
//    result.foreach(println)


    //关闭连接
    sc.stop()
  }

}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
原文地址:https://www.cnblogs.com/LXL616/p/11139436.html