实现维度表自动刷新的一种方式

是采用的将更新的维度表放在最新的分区的形式。

# coding=utf-8

from pyspark.sql.types import IntegerType, StructType
from pyspark.sql import SparkSession
import datetime
from trancarelib.db.hive_util import HiveWrapper

print('--->begin')
print('程序开始时间:', datetime.datetime.now())
spark = SparkSession.builder. 
    appName("study_structured_streaming"). 
    enableHiveSupport(). 
    config("spark.debug.maxToStringFields", "100"). 
    getOrCreate()
hivewrapper = HiveWrapper(spark)
#-----------------------------------------------------------------------------------------------------------------------
Schema_dim = StructType().add("id", IntegerType(), True)
Schema_fact = StructType().add("id", IntegerType(), True).add("cnt", IntegerType(),True)
df_dim = spark.sql("select * from test.t_20210708_dim where par=(select max(par) from test.t_20210708_dim)" ).createOrReplaceTempView('t_dim')
df_join = spark.sql("select * from test.t_20210708_join where par=(select max(par) from test.t_20210708_join)" ).createOrReplaceTempView('t_join')
df = spark.readStream.schema(Schema_fact).orc("s3://transsion-sc/user/hive/warehouse/test.db/t_20210708/").createOrReplaceTempView('t0')
df1 = spark.sql(
    "select t0.id as id,t_dim.par as t_dim_par,t_join.par as t_join_par"
    " from t0 "
    " join t_dim on t0.id=t_dim.id"
    " join t_join on t0.cnt=t_join.id"
)
df1.writeStream.outputMode("append").format("console").option("truncate", False).start().awaitTermination()

  

原文地址:https://www.cnblogs.com/muyue123/p/15016711.html