【spark】sparkSQL自定义批处理输入输出

背景

sparkSQL 自带了很多输入和输出方式

但很多时候需要添加其他输入输出方式例如 hbase、socket、websocket、excel等

输出

以websocket输出为例

最终消息发送的代码:

df.write.format("cn.zwy.websocket")
      .option("uri", "ws://localhost:8888")
      .save()

spark 会在format填写的包 cn.zwy.websocket 下寻找DefaultSource类。因此在该包下创建一个DefaultSource类

package cn.zwy.websocket 

import cn.zwy.common.JsonUtil
import cn.zwy.WebSocketClient
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

import java.net.URI
import scala.collection.JavaConverters._

class DefaultSource extends CreatableRelationProvider with DataSourceRegister{

  override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = {

    val uri = parameters("uri")
    data.foreachPartition(partition=>{
      val client = new WebSocketClient(new URI(uri))
      client.connectBlocking()
      partition.foreach(row=>{
        try {
          val schema = row.schema
          val dataMap = row.getValuesMap(schema.fieldNames).asJava
          client.send(JsonUtil.Encode(dataMap))
        } catch {
          case _:Exception =>
        }
      })
      client.close()
    })

    new BaseRelation {
      override def sqlContext: SQLContext = unsupportedException
      override def schema: StructType = unsupportedException
      override def needConversion: Boolean = unsupportedException
      override def sizeInBytes: Long = unsupportedException
      override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
      private def unsupportedException =
        throw new UnsupportedOperationException("BaseRelation from web socket write " +
          "operation is not usable.")
    }
  }

  override def shortName(): String = "websocket"
}

写出文件需要实现CreatableRelationProvider接口

而对应的读取数据的接口则是 RelationProvider接口

至于最后的返回

new BaseRelation {
      override def sqlContext: SQLContext = unsupportedException
      override def schema: StructType = unsupportedException
      override def needConversion: Boolean = unsupportedException
      override def sizeInBytes: Long = unsupportedException
      override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
      private def unsupportedException =
        throw new UnsupportedOperationException("BaseRelation from web socket write " +
          "operation is not usable.")
    }

没细品,抄的kafka的代码。github上某些项目最后返回的RelationProvider的createRelation方法,貌似是写完了之后再读取一遍。

钦此!

原文地址:https://www.cnblogs.com/zhouwenyang/p/14335605.html