Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十六)Structured Streaming中ForeachSink的用法

Structured Streaming默认支持的sink类型有File sink,Foreach sink,Console sink,Memory sink。

ForeachWriter实现:

以写入redis为例

package com.dx.streaming.producer;

import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class TestForeachWriter extends ForeachWriter<Row> {
    private static final long serialVersionUID = 1801843595306161029L;

    public static JedisPool jedisPool;
    public Jedis jedis;
    static {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(20);
            config.setMaxIdle(5);
            config.setMaxWaitMillis(1000);
            config.setMinIdle(2);
            config.setTestOnBorrow(false);
            jedisPool = new JedisPool(config, "127.0.0.1", 6379);
    }

    public static synchronized Jedis getJedis() {
        return jedisPool.getResource();
    }


    @Override
    public boolean open(long partitionId, long version) {
           jedis = getJedis();
           return true;
    }

    @Override
    public void process(Row row) {
         jedis.set("row.key","row.value");
    }

    @Override
    public void close(Throwable arg0) {
         jedis.close();
    }
}

Structured Streaming中使用ForeachWriter示例:

package com.dx.streaming.producer;

import java.util.HashMap;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import com.databricks.spark.avro.SchemaConverters;

public class TestConsumer {
    private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc";
    //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc";
    private static final String topic = "t-my";

    public static void main(String[] args) throws Exception {
        String appName = "Test Avro";
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName);
        SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate();

        Map<String, String> kafkaOptions = new HashMap<String, String>();
        kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092");
        
        Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath);    
        AvroParserUDF udf = new AvroParserUDF(avroFilePath);
        StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType();        
        sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields()));

        Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY())
                .selectExpr("deserialize(value) as row").select("row.*");

        stream.printSchema();

        // Print new data to console
        StreamingQuery query = stream.writeStream().foreach(new TestForeachWriter()).outputMode("update").start();
        
        try {
            query.awaitTermination();
            sparkSession.streams().awaitAnyTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}

参考:Spark的那些事(二)Structured streaming中Foreach sink的用法

原文地址:https://www.cnblogs.com/yy3b2007com/p/9307701.html