spark教程(九)-操作数据库

数据库也是 spark 数据源创建 df 的一种方式,因为比较重要,所以单独算一节。

本文以 postgres 为例

安装 JDBC

首先需要 安装 postgres 的客户端驱动,即 JDBC 驱动,这是官方下载地址,JDBC,根据数据库版本下载对应的驱动

上传至 spark 目录下的 jars 目录

 并设置环境变量

export SPARK_CLASSPATH = /usr/lib/spark/jars

编程模板

如何操作数据库,不同的版本方法不同,网上的教程五花八门,往往尝试不成功。

其实我们可以看 spark 自带的样例, 路径为 /usr/lib/spark/examples/src/main/python/sql    【编码时,sparkSession 需要声明 spark jars 的驱动路径,代码调用 API JDBC To Other Databases

我从 datasource.py 中找到了基本的读写方法,其他自己可以看看

def jdbc_dataset_example(spark):
    # $example on:jdbc_dataset$
    # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    # Loading data from a JDBC source
    jdbcDF = spark.read 
        .format("jdbc") 
        .option("url", "jdbc:postgresql:dbserver") 
        .option("dbtable", "schema.tablename") 
        .option("user", "username") 
        .option("password", "password") 
        .load()

    jdbcDF2 = spark.read 
        .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})

    # Specifying dataframe column data types on read
    jdbcDF3 = spark.read 
        .format("jdbc") 
        .option("url", "jdbc:postgresql:dbserver") 
        .option("dbtable", "schema.tablename") 
        .option("user", "username") 
        .option("password", "password") 
        .option("customSchema", "id DECIMAL(38, 0), name STRING") 
        .load()

# Saving data to a JDBC source jdbcDF.write .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save() jdbcDF2.write .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$

实战案例

仅供参考,请确保 spark 能连接上数据库

from pyspark.sql import SparkSession
import os

# 获取 环境变量 SPARK_CLASSPATH, 当然需要你事先设定了 该变量
# 如果没有设定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/*
sparkClassPath = os.getenv('SPARK_CLASSPATH', '/usr/lib/spark/jars/*')

### 创建 sparkSession
# spark.driver.extraClassPath 设定了 jdbc 驱动的路径
spark = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .master("local") 
    .config("spark.driver.extraClassPath", sparkClassPath) 
    .getOrCreate()

### 连接数据库并读取表
# airDF 已经是个 DataFrame
airDF = spark.read 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres") 
    .option("driver", "org.postgresql.Driver") 
    .option("dbtable", "road_point002") 
    .option("user", "postgres") 
    .option("password", "postgres") 
    .load()

### 打印schema
airDF.printSchema()     # df 的表结构,我们看到的就是 列名即格式等

### 只打印前20条 -- dsl 方式
airDF.select('id', 'road_number', 'speed_t').show() # id, road_number, speed_t 列名

### 把 df 转成 table -- sql 方式
def func(x):
    print(x)

airDF.registerTempTable('pg')
spark.sql("select * from pg limit 20").foreach(func)


### 存储为 RDBMS、xml、json等格式
## 存到数据库
airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" ,
                 table = "test",mode="append", properties={"user": "postgres", "password": "postgres"})       # 写入数据库

## 存为 json
airDF.write.format('json').save('jsoin_path')       # 存入分区文件
airDF.coalesce(1).write.format('json').save('filtered.json')  # 存入单个文件,不建议使用
原文地址:https://www.cnblogs.com/yanshw/p/11697289.html