Databricks 第3篇:pyspark.sql 通过JDBC连接数据库

Databricks Runtime 包含Azure SQL 数据库的 JDBC 驱动程序,本文介绍如何使用数据帧 API 连接到使用 JDBC 的 SQL 数据库,通过 JDBC 接口进行的读取操作和更新操作。

在Databricks的Notebook中,spark是Databricks内置的一个SparkSession,可以通过该SparkSession来创建DataFrame、引用DataFrameReader和DataFrameWriter等。

一,创建JDBC URL

本文适用Python语言和JDBC驱动程序来连接Azure SQL Database,  

jdbcHostname = "Azure SQL Database"
jdbcDatabase = "db_name"
jdbcPort = 1433
jdbcUsername="user_name"
jdbcPassword="user_password"

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

二,把查询向下推送到数据库引擎

可把整个查询或表向下推送到数据库,且只返回结果。table 参数标识要读取的 JDBC 表。

spark.read.jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

参数注释:

  • url:JDBC URL
  • table:表名或查询
  • column、numPartitions、lowerBound和upperBound:用于指定分区的列名,分区的数量,分区的列的最小值和最大值
  • predicates:谓词,用于对数据进行过滤,类似于Where子句
  • properties:JDBC数据库连接参数的字典

1,向下推送表

如果把table设置为表名,那么表示查询整个表。

pushdown_query = "table_name"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

对查询的结果进行投影和过滤:

spark.read.jdbc(jdbcUrl, table=pushdown_query, connectionProperties).select("carat", "cut", "price").where("cut = 'Good'")

2,向下推送查询

如果向下推送查询,那么需要采用下方的格式:(query) data_alias

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

三,向下推送更新

通过JDBC,把DataFrame的内容存储到外部数据表中: 

spark.write.jdbc(url, table, mode=None, properties=None)

参数注释:

  • url:JDBC Url
  • table:外部数据库的表名
  • mode:数据更新的模式,append、overwrite、ignore、error(默认,如果数据存在,抛出异常)
  • properties:JDBC数据库连接参数的字典

参考文档:

原文地址:https://www.cnblogs.com/ljhdo/p/14177031.html