Structured Streaming默认支持的sink类型有File sink,Foreach sink,Console sink,Memory sink。
ForeachWriter实现:
以写入redis为例
package com.dx.streaming.producer; import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.Row; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class TestForeachWriter extends ForeachWriter<Row> { private static final long serialVersionUID = 1801843595306161029L; public static JedisPool jedisPool; public Jedis jedis; static { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(20); config.setMaxIdle(5); config.setMaxWaitMillis(1000); config.setMinIdle(2); config.setTestOnBorrow(false); jedisPool = new JedisPool(config, "127.0.0.1", 6379); } public static synchronized Jedis getJedis() { return jedisPool.getResource(); } @Override public boolean open(long partitionId, long version) { jedis = getJedis(); return true; } @Override public void process(Row row) { jedis.set("row.key","row.value"); } @Override public void close(Throwable arg0) { jedis.close(); } }
Structured Streaming中使用ForeachWriter示例:
package com.dx.streaming.producer; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import com.databricks.spark.avro.SchemaConverters; public class TestConsumer { private static final String avroFilePath = "D:\Java_Study\workspace\kafka-streaming-learn\conf\avro\userlog.avsc"; //private static final String avroFilePath = "/user/dx/conf/avro/userlog.avsc"; private static final String topic = "t-my"; public static void main(String[] args) throws Exception { String appName = "Test Avro"; SparkConf conf = new SparkConf().setMaster("local[*]").setAppName(appName); SparkSession sparkSession = SparkSession.builder().config(conf).getOrCreate(); Map<String, String> kafkaOptions = new HashMap<String, String>(); kafkaOptions.put("kafka.bootstrap.servers", "192.168.0.121:9092"); Schema schema = SchemaUtil.getAvroSchemaFromHDFSFile(avroFilePath); AvroParserUDF udf = new AvroParserUDF(avroFilePath); StructType type = (StructType) SchemaConverters.toSqlType(schema).dataType(); sparkSession.udf().register("deserialize", udf, DataTypes.createStructType(type.fields())); Dataset<Row> stream = sparkSession.readStream().format("kafka").options(kafkaOptions).option("subscribe", topic).option("startingOffsets", "earliest").load().select("value").as(Encoders.BINARY()) .selectExpr("deserialize(value) as row").select("row.*"); stream.printSchema(); // Print new data to console StreamingQuery query = stream.writeStream().foreach(new TestForeachWriter()).outputMode("update").start(); try { query.awaitTermination(); sparkSession.streams().awaitAnyTermination(); } catch (StreamingQueryException e) { e.printStackTrace(); } } }