package egsql
import java.util.Properties
import com.sun.org.apache.xalan.internal.xsltc.compiler.util.IntType
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object jdbc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("JdbcOperation").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
///////////////////////read///////////////////////////
val properties = new Properties()
properties.put("user","root")
properties.put("password","xxx")
val url = "jdbc:mysql://127.0.0.1:3306/spark?useUnicode=true&characterEncoding=gbk&zeroDateTimeBehavior=convertToNull"
val stud_scoreDF = sqlContext.read.jdbc(url,"dtspark",properties)
stud_scoreDF.show()
///////////////////////write///////////////////////////
//通过并行化创建RDD
val personRDD = sc.parallelize(Array("blm 5 144", "jerry 18 188", "kitty 5 166")).map(_.split(" "))
//通过StrutType直接指定每个字段的schema
val schema = StructType(
List(
StructField("name",StringType,true),
StructField("age",IntegerType,true),
StructField("score",IntegerType,true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).trim, p(1).toInt, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)
//创建Properties存储数据库相关属性
//将数据追加到数据库
personDataFrame.write.mode("append").jdbc("jdbc:mysql://127.0.0.1:3306/spark",
"dtspark",properties)
//停止SparkContext
//print again
stud_scoreDF.show()
sc.stop()
}
}
================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/01/29 20:24:17 INFO SparkContext: Running Spark version 2.2.1
18/01/29 20:24:19 INFO SparkContext: Submitted application: JdbcOperation
18/01/29 20:24:19 INFO SecurityManager: Changing view acls to: fangping
18/01/29 20:24:19 INFO SecurityManager: Changing modify acls to: fangping
18/01/29 20:24:19 INFO SecurityManager: Changing view acls groups to:
18/01/29 20:24:19 INFO SecurityManager: Changing modify acls groups to:
18/01/29 20:24:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(fangping); groups with view permissions: Set(); users with modify permissions: Set(fangping); groups with modify permissions: Set()
18/01/29 20:24:19 INFO Utils: Successfully started service 'sparkDriver' on port 51005.
18/01/29 20:24:19 INFO SparkEnv: Registering MapOutputTracker
18/01/29 20:24:19 INFO SparkEnv: Registering BlockManagerMaster
18/01/29 20:24:19 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
18/01/29 20:24:19 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
18/01/29 20:24:19 INFO DiskBlockManager: Created local directory at C:UsersfangpingAppDataLocalTemplockmgr-b8154e0d-e77b-4ba9-818b-71d3ffb8c553
18/01/29 20:24:19 INFO MemoryStore: MemoryStore started with capacity 339.6 MB
18/01/29 20:24:20 INFO SparkEnv: Registering OutputCommitCoordinator
18/01/29 20:24:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
18/01/29 20:24:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.18.3.13:4040
18/01/29 20:24:20 INFO Executor: Starting executor ID driver on host localhost
18/01/29 20:24:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51014.
18/01/29 20:24:20 INFO NettyBlockTransferService: Server created on 172.18.3.13:51014
18/01/29 20:24:20 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
18/01/29 20:24:20 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.18.3.13, 51014, None)
18/01/29 20:24:20 INFO BlockManagerMasterEndpoint: Registering block manager 172.18.3.13:51014 with 339.6 MB RAM, BlockManagerId(driver, 172.18.3.13, 51014, None)
18/01/29 20:24:20 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.18.3.13, 51014, None)
18/01/29 20:24:20 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.18.3.13, 51014, None)
18/01/29 20:24:20 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/E:/back/scalaWs/Spark2Demo/spark-warehouse/').
18/01/29 20:24:20 INFO SharedState: Warehouse path is 'file:/E:/back/scalaWs/Spark2Demo/spark-warehouse/'.
18/01/29 20:24:21 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
18/01/29 20:24:26 INFO CodeGenerator: Code generated in 239.883814 ms
18/01/29 20:24:27 INFO CodeGenerator: Code generated in 15.603579 ms
18/01/29 20:24:27 INFO SparkContext: Starting job: show at jdbc.scala:20
18/01/29 20:24:27 INFO DAGScheduler: Got job 0 (show at jdbc.scala:20) with 1 output partitions
18/01/29 20:24:27 INFO DAGScheduler: Final stage: ResultStage 0 (show at jdbc.scala:20)
18/01/29 20:24:27 INFO DAGScheduler: Parents of final stage: List()
18/01/29 20:24:27 INFO DAGScheduler: Missing parents: List()
18/01/29 20:24:27 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at show at jdbc.scala:20), which has no missing parents
18/01/29 20:24:29 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 7.5 KB, free 339.6 MB)
18/01/29 20:24:29 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.2 KB, free 339.6 MB)
18/01/29 20:24:29 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.18.3.13:51014 (size: 4.2 KB, free: 339.6 MB)
18/01/29 20:24:29 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
18/01/29 20:24:29 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at show at jdbc.scala:20) (first 15 tasks are for partitions Vector(0))
18/01/29 20:24:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/01/29 20:24:29 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4649 bytes)
18/01/29 20:24:29 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/01/29 20:24:29 INFO JDBCRDD: closed connection
18/01/29 20:24:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1251 bytes result sent to driver
18/01/29 20:24:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 206 ms on localhost (executor driver) (1/1)
18/01/29 20:24:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
18/01/29 20:24:29 INFO DAGScheduler: ResultStage 0 (show at jdbc.scala:20) finished in 0.224 s
18/01/29 20:24:29 INFO DAGScheduler: Job 0 finished: show at jdbc.scala:20, took 2.466901 s
+-----+---+-----+
| name|age|score|
+-----+---+-----+
| swk|500|45455|
| blm| 5| 144|
|jerry| 18| 188|
|kitty| 5| 166|
+-----+---+-----+
18/01/29 20:24:30 INFO SparkContext: Starting job: jdbc at jdbc.scala:44
18/01/29 20:24:30 INFO DAGScheduler: Got job 1 (jdbc at jdbc.scala:44) with 1 output partitions
18/01/29 20:24:30 INFO DAGScheduler: Final stage: ResultStage 1 (jdbc at jdbc.scala:44)
18/01/29 20:24:30 INFO DAGScheduler: Parents of final stage: List()
18/01/29 20:24:30 INFO DAGScheduler: Missing parents: List()
18/01/29 20:24:30 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[9] at jdbc at jdbc.scala:44), which has no missing parents
18/01/29 20:24:30 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 10.8 KB, free 339.6 MB)
18/01/29 20:24:30 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.8 KB, free 339.6 MB)
18/01/29 20:24:30 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.18.3.13:51014 (size: 5.8 KB, free: 339.6 MB)
18/01/29 20:24:30 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
18/01/29 20:24:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[9] at jdbc at jdbc.scala:44) (first 15 tasks are for partitions Vector(0))
18/01/29 20:24:30 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/01/29 20:24:30 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, PROCESS_LOCAL, 4868 bytes)
18/01/29 20:24:30 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
18/01/29 20:24:30 INFO CodeGenerator: Code generated in 13.199986 ms
18/01/29 20:24:30 INFO CodeGenerator: Code generated in 86.854105 ms
18/01/29 20:24:30 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 970 bytes result sent to driver
18/01/29 20:24:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 472 ms on localhost (executor driver) (1/1)
18/01/29 20:24:30 INFO DAGScheduler: ResultStage 1 (jdbc at jdbc.scala:44) finished in 0.473 s
18/01/29 20:24:30 INFO DAGScheduler: Job 1 finished: jdbc at jdbc.scala:44, took 0.496857 s
18/01/29 20:24:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
18/01/29 20:24:30 INFO SparkContext: Starting job: show at jdbc.scala:49
18/01/29 20:24:30 INFO DAGScheduler: Got job 2 (show at jdbc.scala:49) with 1 output partitions
18/01/29 20:24:30 INFO DAGScheduler: Final stage: ResultStage 2 (show at jdbc.scala:49)
18/01/29 20:24:30 INFO DAGScheduler: Parents of final stage: List()
18/01/29 20:24:30 INFO DAGScheduler: Missing parents: List()
18/01/29 20:24:30 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at show at jdbc.scala:49), which has no missing parents
18/01/29 20:24:30 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.5 KB, free 339.6 MB)
18/01/29 20:24:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.2 KB, free 339.6 MB)
18/01/29 20:24:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 172.18.3.13:51014 (size: 4.2 KB, free: 339.6 MB)
18/01/29 20:24:30 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
18/01/29 20:24:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at show at jdbc.scala:49) (first 15 tasks are for partitions Vector(0))
18/01/29 20:24:30 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/01/29 20:24:30 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 4649 bytes)
18/01/29 20:24:30 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
18/01/29 20:24:30 INFO JDBCRDD: closed connection
18/01/29 20:24:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1217 bytes result sent to driver
18/01/29 20:24:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 26 ms on localhost (executor driver) (1/1)
18/01/29 20:24:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
18/01/29 20:24:30 INFO DAGScheduler: ResultStage 2 (show at jdbc.scala:49) finished in 0.027 s
18/01/29 20:24:30 INFO DAGScheduler: Job 2 finished: show at jdbc.scala:49, took 0.088025 s
+-----+---+-----+
| name|age|score|
+-----+---+-----+
| swk|500|45455|
| blm| 5| 144|
|jerry| 18| 188|
|kitty| 5| 166|
| blm| 5| 144|
|jerry| 18| 188|
|kitty| 5| 166|
+-----+---+-----+
18/01/29 20:24:30 INFO SparkUI: Stopped Spark web UI at http://172.18.3.13:4040
18/01/29 20:24:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/01/29 20:24:30 INFO MemoryStore: MemoryStore cleared
18/01/29 20:24:30 INFO BlockManager: BlockManager stopped
18/01/29 20:24:30 INFO BlockManagerMaster: BlockManagerMaster stopped
18/01/29 20:24:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/01/29 20:24:30 INFO SparkContext: Successfully stopped SparkContext
18/01/29 20:24:30 INFO ShutdownHookManager: Shutdown hook called
18/01/29 20:24:30 INFO ShutdownHookManager: Deleting directory C:Users\AppDataLocalTempspark-e888fd39-2e41-43e5-829b-ca203b786cef