Flink学习(十一) Sink到Elasticsearch

导入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>
package com.wyh.streamingApi.sink

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests

//温度传感器读数样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object Sink2ES {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //Source操作
    val inputStream = env.readTextFile("F:\flink-study\wyhFlinkSD\data\sensor.txt")

    //Transform操作
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("192.168.230.30", 9200))

    //创建一个ES Sink的builder
    val esSinkBuilder: ElasticsearchSink.Builder[SensorReading] = new ElasticsearchSink.Builder[SensorReading](
      httpHosts,
      new ElasticsearchSinkFunction[SensorReading] {
        override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          println("saving data:" + t)

          //包装成一个Map或者JsonObject格式
          val json = new util.HashMap[String, String]()

          json.put("sensor_id", t.id)
          json.put("timestamp", t.timestamp.toString)
          json.put("temperature", t.temperature.toString)

          //创建indexRequest准备发送数据
          val indexRequest = Requests.indexRequest()
            .index("sensor")
            .`type`("readingdata")
            .source(json)

          //利用requestIndexer进行发送请求,写入数据
          requestIndexer.add(indexRequest)
          println("data 写入完成。。。")
        }
      }
    )


    //Sink操作
    dataStream.addSink(esSinkBuilder.build())

    env.execute("sink ES test")
  }

}

启动ES

 启动kibana

 运行

 查看结果

原文地址:https://www.cnblogs.com/wyh-study/p/12924829.html