【Spark】这一篇或许能让你大概了解如何通过JavaAPI实现DataFrame的相关操作


需求概述

将RDD转换得到DataFrame,主要有两种方法:利用反射机制通过编程结构与RDD进行交互

步骤

一、创建Maven工程并导包

<properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!--    <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

二、选用第一种方法:利用反射机制配合样例类构建DataFrame

开发代码
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

// 定义person case class
case class Person(id: Int,name: String,age: Int)

object SparkDF {
  def main(args: Array[String]): Unit = {
    // 获取SparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("SparkDF").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
    // 获取SparkContext
    val sparkContext: SparkContext = sparkSession.sparkContext
    // 过滤筛选日志
    sparkContext.setLogLevel("WARN")
    //读取文件
    val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/资料/person.txt")
    //按照分隔符切割数据
    val splitRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
    //关联person case class
    val personRDD: RDD[Person] = splitRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //准备把personRDD转换成DataFrame
    //首先要导包
    import sparkSession.implicits._
    val personDF: DataFrame = personRDD.toDF()
    //todo 至此已经成功通过反射机制构建DataFrame

    /**
     * 接下来可以使用DFL语法和SQL语法验证是否构建成功
     */
    println("*******************************************DSL语法测试开始********************************************")

    //查看全表数据    show()默认展示前20条数据
    personDF.show()
    //查看指定字段数据
    personDF.select($"name",$"age").show()
    //查看schema
    personDF.printSchema()
    //过滤筛选数据
    personDF.filter($"age" > 25).show()

    println("*******************************************DSL语法测试结束********************************************")

    println("*******************************************SQL语法测试开始********************************************")
    // 将DataFrame注册为一张table,有三种方法
    //第一种 已过时
    val person1: Unit = personDF.registerTempTable("person1")
    //第二种
    val person2: Unit = personDF.createTempView("person2")
    //第三种 (推荐)
    val person3: Unit = personDF.createOrReplaceTempView("person3")

    //打印三钟表的数据
    sparkSession.sql("select * from person1").show()
    sparkSession.sql("select * from person2").show()
    sparkSession.sql("select * from person3").show()

    //实现left join操作
    sparkSession.sql("select * from person1 p1 left join person2 p2 on p1.id = p2.id").show()


    println("*******************************************SQL语法测试结束********************************************")

	sparkContext.stop()
    sparkSession.close()
  }
}
控制台结果

*******************************************DSL语法测试开始********************************************
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+--------+---+
|    name|age|
+--------+---+
|zhangsan|108|
|    lisi| 28|
|  wangwu| 58|
| zhaoliu| 24|
|   zhuqi| 35|
|   qiuba| 28|
+--------+---+

root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

*******************************************DSL语法测试结束********************************************
*******************************************SQL语法测试开始********************************************
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan|108|
|  2|    lisi| 28|
|  3|  wangwu| 58|
|  4| zhaoliu| 24|
|  5|   zhuqi| 35|
|  6|   qiuba| 28|
+---+--------+---+

+---+--------+---+---+--------+---+
| id|    name|age| id|    name|age|
+---+--------+---+---+--------+---+
|  1|zhangsan|108|  1|zhangsan|108|
|  6|   qiuba| 28|  6|   qiuba| 28|
|  3|  wangwu| 58|  3|  wangwu| 58|
|  5|   zhuqi| 35|  5|   zhuqi| 35|
|  4| zhaoliu| 24|  4| zhaoliu| 24|
|  2|    lisi| 28|  2|    lisi| 28|
+---+--------+---+---+--------+---+

*******************************************SQL语法测试结束********************************************

Process finished with exit code 0

选用第二种方法:通过StrucType配合Row构建DataFrame

开发代码
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object SparkDF2 {
  def main(args: Array[String]): Unit = {
    //获取SparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("saprkDF2").master("local[2]").config("spark.driver.host", "localhost").getOrCreate()
    //获取SparkContext
    val sparkContext: SparkContext = sparkSession.sparkContext
    //设置日志筛选
    sparkContext.setLogLevel("WARN")
    //读取文件
    val fileRDD: RDD[String] = sparkContext.textFile("/Users/zhaozhuang/Desktop/4、Spark/Spark第三天/Spark第三天教案/资料/person.txt")
    //对数据进行切割
    val arrayRDD: RDD[Array[String]] = fileRDD.map(x => x.split(" "))
    //将arrayRDD转换为row
    val rowRDD: RDD[Row] = arrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))

    //获取StructType对象
    val structType = new StructType().add("id",IntegerType).add("name",StringType).add("age",IntegerType)

    //需要两个参数 RDD[Row] 和 StructType
    val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, structType)

    //接下来DSL和SQL的操作和之前相同
    personDF.printSchema()
    
	sparkContext.stop()
    sparkSession.close()

  }

}
控制台结果

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)


Process finished with exit code 0
原文地址:https://www.cnblogs.com/zzzsw0412/p/12772387.html