入门

# rate
from
pyspark.sql import SparkSession spark = SparkSession.builder. appName("study_structured_streaming"). enableHiveSupport(). config("spark.debug.maxToStringFields", "100"). getOrCreate() df = spark.readStream .format(source="rate") .option("rowPerSecond", "5") .option("rampUpTime", "1s") .option("numPartitions", "2") .load() df.writeStream.format(source="console").outputMode("append").start().awaitTermination() spark.stop()

 # Sink的例子

from pyspark.sql import SparkSession

spark = SparkSession.builder. 
    appName("study_structured_streaming"). 
    enableHiveSupport(). 
    config("spark.debug.maxToStringFields", "100"). 
    getOrCreate()

df = spark.readStream 
    .format(source="rate") 
    .option("rowPerSecond", "5")
    .option("rampUpTime", "1s")
    .option("numPartitions", "2")
    .load()

df.writeStream.format("csv").option("checkpointLocation", "ck/20210623/").option("path", "output/123.csv") .start().awaitTermination()

spark.stop()
原文地址:https://www.cnblogs.com/muyue123/p/14922870.html