spark SQL编程

1.编程实现将 RDD 转换为 DataFrame
源文件内容如下(包含 id,name,age):

1,Ella,36
2,Bob,29
3,Jack,29

 

 

将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。 

import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object RDDtoDF {
  def main(args: Array[String]) {
   val spark=SparkSession.builder().appName("RddToFrame").master("local").getOrCreate()
    import  spark.implicits._
    val employeeRDD=spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
    val schemaString="id name age"
    val fields=schemaString.split(" ").map(fieldName=>StructField
    (fieldName,StringType,nullable = true))
    val schema = StructType(fields)
    val  rowRDD  =  employeeRDD.map(_.split(",")).map(attributes  =>
      Row(attributes(0).trim, attributes(1), attributes(2).trim))

    val employeeDF = spark.createDataFrame(rowRDD, schema)
    employeeDF.createOrReplaceTempView("employee")
    val results=spark.sql("select id,name,age from employee")
    results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()

  }
}

2.编程实现利用 DataFrame 读写 MySQL 的数据
1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的
两行数据。
6-2 employee 表原有数据

id name gender Age
1 Alice F 22
2 John M 25

 打开mysql

 


2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

6-3 employee 表新增数据

id name gender age
3 Mary F 26
4 Tom M 23

 

 

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object TestMySQL {
  def main(args: Array[String]): Unit = {
    val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
    import  spark.implicits._
    val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
    val schema=StructType(List(StructField("id",IntegerType,
      true),StructField("name",StringType,true),StructField("gender",StringType,true),
      StructField("age",IntegerType,true)))
    val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
    val employeeDF=spark.createDataFrame(rowRDD,schema)
    val prop=new Properties()
    prop.put("user","root")
    prop.put("password","wangli")
    prop.put("driver","com.mysql.jdbc.Driver")
    employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
    val jdbcDF = spark.read.format("jdbc").option("url",
      "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee")
      .option("user","root").option("password", "wangli").load()
    jdbcDF.agg("age" -> "max", "age" -> "sum").show()
  }

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 





本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利.
原文地址:https://www.cnblogs.com/wl2017/p/10604328.html