2.3、操纵json、parquet、jdbc数据库

1、启动spark shell

spark-shell --master spark://s101:7077

2、选择数据库

spark.sql("use spark").show

3、生成DF

scala> val df = spark.sql("select * from orders")
df: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields]

4、打印数据

scala> df.show
+---+-----+-----+---+
|oid|  num|price|cid|
+---+-----+-----+---+
|  1|no001| 12.3|  7|
|  2|no002| 18.8|  4|
+---+-----+-----+---+

5.1、输出为json:切为local模式

df.write.json("file:///home/centos/myspark/json")

     读取json文件

scala> val dfRead = spark.read.json("file:///home/centos/myspark/json")
dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields]

scala> dfRead.show
+---+-----+---+-----+
|cid|  num|oid|price|
+---+-----+---+-----+
|  7|no001|  1| 12.3|
|  4|no002|  2| 18.8|
|  3|no003|  3| 20.0|
|  7|no004|  4| 50.0|
|  2|no005|  5| 23.1|
|  3|no006|  6| 39.0|
|  2|no007|  7|  5.0|
|  1|no008|  8|  6.0|
+---+-----+---+-----+

5.2、保存为parquet格式

df.write.parquet("file:///home/centos/myspark/par")

  读取parquet文件

scala> val dfReadPar = spark.read.parquet("file:///home/centos/myspark/par")
dfReadPar: org.apache.spark.sql.DataFrame = [oid: int, num: string ... 2 more fields]

scala> dfReadPar.show
+---+-----+-----+---+
|oid|  num|price|cid|
+---+-----+-----+---+
|  1|no001| 12.3|  7|
|  2|no002| 18.8|  4|
|  3|no003| 20.0|  3|
|  4|no004| 50.0|  7|
|  5|no005| 23.1|  2|
|  6|no006| 39.0|  3|
|  7|no007|  5.0|  2|
|  8|no008|  6.0|  1|
+---+-----+-----+---+

5.3、操纵jdbc数据库

//读取表数据
scala> val prop = new java.util.Properties() prop: java.util.Properties = {} scala> prop.put("driver" , "com.mysql.jdbc.Driver") res5: Object = null scala> prop.put("user" , "root") res6: Object = null scala> prop.put("password" , "root") res7: Object = null scala> val df = spark.read.jdbc("jdbc:mysql://s101:3306/lx" , "wc1" ,prop)

  scala> df.show

  写入到i表

scala> val dfRead = spark.read.json("file:///home/centos/myspark/json")
dfRead: org.apache.spark.sql.DataFrame = [cid: bigint, num: string ... 2 more fields]

scala> dfRead.write.
bucketBy   format       jdbc   mode     options   parquet       save          sortBy
csv        insertInto   json   option   orc       partitionBy   saveAsTable   text

scala> dfRead.write.jdbc
   def jdbc(url: String,table: String,connectionProperties: java.util.Properties): Unit

scala> dfRead.write.jdbc("jdbc:mysql://s101:3306/lx" , "wuyong" ,prop)

  查看

mysql> select * from wuyong;
+------+-------+------+-------+
| cid  | num   | oid  | price |
+------+-------+------+-------+
|    7 | no001 |    1 |  12.3 |
|    4 | no002 |    2 |  18.8 |
|    3 | no003 |    3 |    20 |
|    7 | no004 |    4 |    50 |
|    2 | no005 |    5 |  23.1 |
|    3 | no006 |    6 |    39 |
|    2 | no007 |    7 |     5 |
|    1 | no008 |    8 |     6 |
+------+-------+------+-------+
渐变 --> 突变
原文地址:https://www.cnblogs.com/lybpy/p/9832225.html