Flink之Sink(文件、Kafka、Redis、Es、Mysql)

提醒:

1、连接kafka,一定要注意依赖版本,否则即使程序启动没有报错,也不会接受到数据
2、kafka依赖除了Flink和kafka的,建议加上kafka-client的依赖,对应版本也需一致
-----------------------------------------------------------------------
1、连接es,需要使用flink-connector-elasticsearch6_2.11、elasticsearch-rest-high-level-client、elasticsearch-rest-client、elasticsearch,否则无法启动

数据格式

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

##########保存至文件##############

1、处理主类

package sink

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
 * @author yangwj
 * @date 2021/1/6 21:17
 * @version 1.0
 */
object FileSink {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })
    //1、过时方法
    val savePath = "G:\Java\Flink\guigu\flink\src\main\resources\sensorToFile"
    dataStream.writeAsCsv(savePath)

    //2、分布式方法
    val saveDistributePath = "G:\Java\Flink\guigu\flink\src\main\resources\saveDistributePath"
    dataStream.addSink(StreamingFileSink.forRowFormat(
      new Path(saveDistributePath),
      new SimpleStringEncoder[SensorReading]())
      .build()
    )
    env.execute("FileSink Test")

  }

}

##########保存至Es##############

1、依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>6.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.6.0</version>
        </dependency>

2、处理主类

package sink

import java.util

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
/**
 * @author yangwj
 * @date 2021/1/6 22:05
 * @version 1.0
 */
object EsSink {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost",9200))

    //自定义写入Es的function
    val myEsSinkFunc: ElasticsearchSinkFunction[SensorReading] = new ElasticsearchSinkFunction[SensorReading] {

      //process 数据处理方法,并发送至Es
      override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
        //包装一个Map作为data source
        val dataSource = new util.HashMap[String, String]()
        dataSource.put("id",t.id)
        dataSource.put("temperature",t.temperature.toString)
        dataSource.put("ts",t.timestamp.toString)

        //创建indexRequest,用于发送http请求
        val request: IndexRequest = Requests.indexRequest().index("sensor").`type`("_doc").source(dataSource)

        requestIndexer.add(request)
      }

    }
esLink.setBulkFlushMaxActions(1)  //表示将buffer中的数据sink到es
dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHosts,myEsSinkFunc).build()) 
env.execute(
"Es Sink Test") } }

##########保存至Kafka##############

1、依赖(注意:一定要注意版本的问题,否则程序启动没有错误,也接受不到kafka的数据

 <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
      <version>1.10.1</version>
 </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>

2、处理主类

package sink

import java.util.Properties

import com.yangwj.api.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer010, FlinkKafkaProducer011}
/**
 * @author yangwj
 * @date 2021/1/6 21:32
 * @version 1.0
 */
object KafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 从kafka读取数据
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    val stream = env.addSource( new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties) )

    // 先转换成样例类类型(简单转换操作)
    val dataStream = stream
      .map( data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble).toString
      } )

    dataStream.addSink( new FlinkKafkaProducer011[String]("localhost:9092", "sinktest", new SimpleStringSchema()) )

    env.execute("kafka sink test")


  }
}

##########保存至Mysql##############

1、依赖

 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.44</version>
 </dependency>

2、处理主类

package sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.yangwj.api.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
 * @author yangwj
 * @date 2021/1/6 22:27
 * @version 1.0
 * 由于官网没有提供,无法保证数据一致性
 */
object MysqlSink {

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    dataStream.addSink( new MyJdbcSinkFunc() )

    env.execute("mysql Sink Test")
  }
}
class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{
  // 定义连接、预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
    insertStmt = conn.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading): Unit = {
    // 先执行更新操作,查到就更新
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果更新没有查到数据,那么就插入
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}

##########保存至Redis##############

1、依赖

    <dependency>
      <groupId>org.apache.bahir</groupId>
      <artifactId>flink-connector-redis_2.11</artifactId>
      <version>1.0</version>
    </dependency>

2、处理主类

package sink

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
 * @author yangwj
 * @date 2021/1/6 21:48
 * @version 1.0
 */
object RedisSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputFile:String = "G:\Java\Flink\guigu\flink\src\main\resources\sensor.txt"
    val input: DataStream[String] = env.readTextFile(inputFile)

    val dataStream = input.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //1、redis配置
    val config: FlinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder().setHost("localhost")
      .setPort(6379)
      .build()

    dataStream.addSink(new RedisSink[SensorReading](config , new MyRedisMapper()))
  }
}

// 2、定义一个RedisMapper
class MyRedisMapper extends RedisMapper[SensorReading]{
  // 定义保存数据写入redis的命令,HSET 表名 key value
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET, "sensor_temp")
  }
  // 将温度值指定为value
  override def getValueFromData(data: SensorReading): String = data.temperature.toString

  // 将id指定为key
  override def getKeyFromData(data: SensorReading): String = data.id
}
原文地址:https://www.cnblogs.com/ywjfx/p/14244015.html