Spark-Dataframe(SQL)

Spark-Dataframe创建-读取json文件

jsData.js 数据

{"name":"json","age":23,"hobby":"running"}
{"name":"charles","age":32,"hobby":"basketball"}
{"name":"tom","age":28,"hobby":"football"}
{"name":"lili","age":24,"hobby":"running"}
{"name":"bob","age":20,"hobby":"swimming"}
//    创建spark对象
val spark = SparkSession.builder().getOrCreate()

//    读取 js文件来创建 DF
val df = spark.read.json("D:/jsData.js")

//    查看信息
df.show()

//    查看表结构
df.printSchema()

//    选择多列 ,并且对age+1
df.select(df("name"), df("age") + 1).show()

//    将name列重命名为username
df.select(df("name").as("username"), df("age")).show()

//    筛选age大于25的数据
df.filter(df("age") > 25).show()

//    分组统计age数量
df.groupBy("age").count().show()

//    降序排序age
df.sort(df("age").desc).show()

//    多列排序
df.sort(df("age").desc, df("name").desc).show()
+---+----------+-------+
|age|     hobby|   name|
+---+----------+-------+
| 23|   running|   json|
| 32|basketball|charles|
| 28|  football|    tom|
| 24|   running|   lili|
| 20|  swimming|    bob|
+---+----------+-------+

root
 |-- age: long (nullable = true)
 |-- hobby: string (nullable = true)
 |-- name: string (nullable = true)

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|   json|       24|
|charles|       33|
|    tom|       29|
|   lili|       25|
|    bob|       21|
+-------+---------+

+--------+---+
|username|age|
+--------+---+
|    json| 23|
| charles| 32|
|     tom| 28|
|    lili| 24|
|     bob| 20|
+--------+---+

+---+----------+-------+
|age|     hobby|   name|
+---+----------+-------+
| 32|basketball|charles|
| 28|  football|    tom|
+---+----------+-------+

+---+-----+
|age|count|
+---+-----+
| 32|    1|
| 28|    1|
| 23|    1|
| 20|    1|
| 24|    1|
+---+-----+

+---+----------+-------+
|age|     hobby|   name|
+---+----------+-------+
| 32|basketball|charles|
| 28|  football|    tom|
| 24|   running|   lili|
| 23|   running|   json|
| 20|  swimming|    bob|
+---+----------+-------+

+---+----------+-------+
|age|     hobby|   name|
+---+----------+-------+
| 32|basketball|charles|
| 28|  football|    tom|
| 24|   running|   lili|
| 23|   running|   json|
| 20|  swimming|    bob|
+---+----------+-------+
Spark-Dataframe创建-Rdd转Dataframe

数据

历史 4
java 2
C 8
C++ 1
python 3
PHP 5
import org.apache.spark.sql.{Row, SparkSession}


//创建 case class 用于包装每一行数据
case class Thing(name: String, num: Int)

// 创建 spark session 会话
val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()

// 包装每一行数据
val rowRdd = spark.sparkContext
.textFile("D:/data.txt")
.map(_.split(" "))
.map(words => Thing(words(0), words(1).trim.toInt)) //将数据包装入case class 里面

//导入 spark session 对象的隐式转换
import spark.implicits._

// 将包装后的数据转换成 Dataframe
val rddDF = rowRdd.toDF()

// 创建临时表
rddDF.createOrReplaceTempView("table")

// 查看所有数据
rddDF.show()

// 选择数据并保存为特定格式的数据
//    rddDF.select("name","num").write.format("csv").save("D:/sava.csv")

//查看表结构
rddDF.printSchema()

//结束spark 会话进程
spark.stop()
+------+---+
|  name|num|
+------+---+
| scala|  4|
|  java|  2|
|     C|  8|
|   C++|  1|
|python|  3|
|   PHP|  5|
+------+---+

root
 |-- name: string (nullable = true)
 |-- num: integer (nullable = false)
Spark-Dataframe创建-加载到SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}


// 创建 spark session会话
val spark = SparkSession
.builder()
.appName("Dataframe")
.master("local")
.getOrCreate()

// 包装处理行数据
val rowRdd = spark.sparkContext
.textFile("D:/data.txt")
.map(_.split(" ")) // 分割每一列的数据
.map(words => Row(words(0), words(1))) // 包装每一行的数据为 Row

val fieldName = "name,num"
//定义列指定的结构
val fields = fieldName.split(",")
.map(words => StructField(words, StringType, true)) //设置字段结构
val columnFiled = StructType(fields) //创建列字段

spark.createDataFrame(rowRdd, columnFiled) //加载行rdd数据,列字段
.createOrReplaceTempView("thing") //创建 thing 表

// 使用SQL语句 查询数据
spark.sql("select * from thing").show()

//关闭 spark session 会话
spark.stop()
+------+---+
|  name|num|
+------+---+
| scala|  4|
|  java|  2|
|     C|  8|
|   C++|  1|
|python|  3|
|   PHP|  5|
+------+---+
Spark-DataFrame数据读取和保存
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}

val conf = new SparkConf()
.setAppName("hotel")
.setMaster("local[*]")
val sc = new SparkContext(conf)

val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()

//读取 json 的数据
val df = spark.read.format("json").load("D:/jsData.js")

// 选取并保存为 csv 格式的数据
df.select("hobby", "name", "age").write
.format("csv").save("D:/js_to_csv") //csv 文件保存
df.write.parquet("D:/json_to_parquet.parquet") // parquest 文件保存

//读取 parquet 文件的信息
spark.read.parquet("D:/json_to_parquet.parquet")
.createOrReplaceTempView("parquetTable")

spark.sql("select name,age from parquetTable").show()
spark.sql("select * from parquetTable").foreach(row => println(row(0), row(1), row(2)))

// sc 可以一次性读取目录下的多个文件的内容组合起来
sc.textFile("D:/js_to_csv").foreach(println)
+-------+---+
|   name|age|
+-------+---+
|   json| 23|
|charles| 32|
|    tom| 28|
|   lili| 24|
|    bob| 20|
+-------+---+

(23,running,json)
(32,basketball,charles)
(28,football,tom)
(24,running,lili)
(20,swimming,bob)

running,lili,24
swimming,bob,20
running,json,23
basketball,charles,32
football,tom,28
Spark-Dataframe-Mysql操作
import java.util.Properties
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}


val spark = SparkSession
.builder()
.appName("SparkSessionT")
.master("local[1]")
.getOrCreate()

val jdbcDF = spark.read.format("jdbc") //利用jdbc读取MySQL数据库的数据
.option("url", "jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC") //连接的URL
.option("driver", "com.mysql.jdbc.Driver") //连接的驱动
.option("dbtable", "apps") //获取的 数据表
.option("user", "root") //用户名
.option("password", "123456") //密码
.load() //登录

// 查看 表的所有数据
jdbcDF.show()

//插入数据到MySQL(需要将数据转换为Dataframe)
val dataRdd = spark.sparkContext
.parallelize(Array("微信 https://wx.qq.com/ CN", "京东 https://www.jd.com/ CN")) // 处理插入的数据
.map(_.split(" "))
.map(words => Row(words(0), words(1), words(2))) //包装为行数据

//定义列字段和类型
val fields = StructType(List(StructField("app_name", StringType), StructField("url", StringType), StructField("country", StringType)))

//绑定 数据和字段创建 DF
val rddDF = spark.createDataFrame(dataRdd, fields)

//包装登录信息
val info = new Properties()
info.put("user", "root")
info.put("password", "123456")
info.put("driver", "com.mysql.jdbc.Driver")

//以追加的方式向 apps 表中插入数据
rddDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC", "testdb.apps", info)

// 查看 表的所有数据
jdbcDF.show()
+---+--------+--------------------+-------+
| id|app_name|                 url|country|
+---+--------+--------------------+-------+
|  1|  QQ APP|   http://im.qq.com/|     CN|
|  2|  微博 APP|   http://weibo.com/|     CN|
|  3|  淘宝 APP|https://www.taoba...|     CN|
+---+--------+--------------------+-------+

+---+--------+--------------------+-------+
| id|app_name|                 url|country|
+---+--------+--------------------+-------+
|  1|  QQ APP|   http://im.qq.com/|     CN|
|  2|  微博 APP|   http://weibo.com/|     CN|
|  3|  淘宝 APP|https://www.taoba...|     CN|
| 10|      微信|  https://wx.qq.com/|     CN|
| 11|      京东| https://www.jd.com/|     CN|
+---+--------+--------------------+-------+
原文地址:https://www.cnblogs.com/studyNotesSL/p/11341328.html