flink-----实时项目---day07-----1.Flink的checkpoint原理分析 2. 自定义两阶段提交sink(MySQL) 3 将数据写入Hbase(使用幂等性结合at least Once实现精确一次性语义) 4 ProtoBuf

1.Flink中exactly once实现原理分析

  生产者从kafka拉取数据以及消费者往kafka写数据都需要保证exactly once。目前flink中支持exactly once的source不多,有kafka source;能实现exactly once的sink也不多,如kafka sink、streamingFileSink,其都要开启checkpoint才能实现exactly once。接下来以FlinkKafkaProducer为例,深入研究其源代码,从而理解flink中的exactly once(精准一次性语义)是怎么实现的。

1.1 大致流程图(也叫分两阶段提交原理)

 1. JobManager定期(通过CheckpointCodinator)向各个包含state的subTask发起checkpoint的请求

 2. subTask将各自的state写入到相应的statebackend,一个资源槽对应一个文件,其中各个subTask的state写入这个文件中

 3. 各个subTask向JobManager发送checkpoint成功的消息

 4. 当所有subTask都发送了checkpoint成功的消息后,jobManager会向所有实现了checkpoint的subTask发送成功的消息

 5. subTask往kafka写数据,并且向Kafka提交事务()

注意:为了保证一个流水线(pipeline)上的operrator state和keyedstate数据一致,flink引入了barrier机制,即在jobmanager和taskManager间设置一个barrier,相当于节流,保证在checkpoint时,source不能在读取数据

问题:kafka涉及到生产者往里面写数据一个事务,以及消费者读取数据一个事务,这两个事物间有什么联系?

1.2 源码解析

(1)首先看FlinkKafkaProducer类,可以发现其继承了TwoPhaseCommitSinkFunction

 (2)TwoPhaseCommitSinkFunction是所有要实现一次性语义的SinkFunction的一个比较推荐的基类,其实现了两个重要的接口,分别为:CheckpointedFunction, CheckpointListener

  • CheckpointedFunction接口

 此接口中包含两个方法,分别为snapshotState方法、initializeState方法,源代码如下

public interface CheckpointedFunction {

    /**
     * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to
     * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when
     * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself.
     *
     * @param context the context for drawing a snapshot of the operator
     * @throws Exception
     */
    void snapshotState(FunctionSnapshotContext context) throws Exception;

    /**
     * This method is called when the parallel function instance is created during distributed
     * execution. Functions typically set up their state storing data structures in this method.
     *
     * @param context the context for initializing the operator
     * @throws Exception
     */
    void initializeState(FunctionInitializationContext context) throws Exception;

}
View Code

  其中snapshotState方法是用checkpoint时,拍快照,其能将state持久化到statebackend。这里面存了一些transactionID、subTask编号、以及kafka的相关信息(用来写数据)。若是checkpoint成功了,但是subTask并没有成功将数据写入kafka,则会通过这个方法恢复原先最近的state进行恢复,然后继续

  initializeState方法可以用来恢复state,解释可能以前将state持久化到了statebackend,但并没有将数据成功写入kafka,则可以ton过这个方法恢复最近的state,然后将数据继续往kafka写数据。

  • CheckpointListener接口

此接口中包含一个notifyCheckpointComplete方法

源码如下

/**
 * This interface must be implemented by functions/operations that want to receive
 * a commit notification once a checkpoint has been completely acknowledged by all
 * participants.
 */
@PublicEvolving
public interface CheckpointListener {

    /**
     * This method is called as a notification once a distributed checkpoint has been completed.
     * 
     * Note that any exception during this method will not cause the checkpoint to
     * fail any more.
     * 
     * @param checkpointId The ID of the checkpoint that has been completed.
     * @throws Exception
     */
    void notifyCheckpointComplete(long checkpointId) throws Exception;
}
View Code

notifyCheckpointComplete方法什么时候被调用呢?所有分区的subTask向JobManager相应checkpoint后才会被调用,即告知各个subTask,这次checkpoint成功了,可以进行下一步的操作了,该方法源码如下:

    @Override
    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
        // the following scenarios are possible here
        //
        //  (1) there is exactly one transaction from the latest checkpoint that
        //      was triggered and completed. That should be the common case.
        //      Simply commit that transaction in that case.
        //
        //  (2) there are multiple pending transactions because one previous
        //      checkpoint was skipped. That is a rare case, but can happen
        //      for example when:
        //
        //        - the master cannot persist the metadata of the last
        //          checkpoint (temporary outage in the storage system) but
        //          could persist a successive checkpoint (the one notified here)
        //
        //        - other tasks could not persist their status during
        //          the previous checkpoint, but did not trigger a failure because they
        //          could hold onto their state and could successfully persist it in
        //          a successive checkpoint (the one notified here)
        //
        //      In both cases, the prior checkpoint never reach a committed state, but
        //      this checkpoint is always expected to subsume the prior one and cover all
        //      changes since the last successful one. As a consequence, we need to commit
        //      all pending transactions.
        //
        //  (3) Multiple transactions are pending, but the checkpoint complete notification
        //      relates not to the latest. That is possible, because notification messages
        //      can be delayed (in an extreme case till arrive after a succeeding checkpoint
        //      was triggered) and because there can be concurrent overlapping checkpoints
        //      (a new one is started before the previous fully finished).
        //
        // ==> There should never be a case where we have no pending transaction here
        //

        Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
        Throwable firstError = null;

        while (pendingTransactionIterator.hasNext()) {
            Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
            Long pendingTransactionCheckpointId = entry.getKey();
            TransactionHolder<TXN> pendingTransaction = entry.getValue();
            if (pendingTransactionCheckpointId > checkpointId) {
                continue;
            }

            LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);

            logWarningIfTimeoutAlmostReached(pendingTransaction);
            try {
                commit(pendingTransaction.handle);
            } catch (Throwable t) {
                if (firstError == null) {
                    firstError = t;
                }
            }

            LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);

            pendingTransactionIterator.remove();
        }

        if (firstError != null) {
            throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                firstError);
        }
    }
View Code

注意,该方法除了提醒个subTask此次checkpoint成功了外,还会提交事务,具体见源码如下(为该方法源码的一部分):

 FlinkKafkaProducer中的commit方法

    @Override
    protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try {
                transaction.producer.commitTransaction();
            } finally {
                recycleTransactionalProducer(transaction.producer);
            }
        }
    }
View Code

  若是事务提交失败后,该怎么办呢?没关系,事务提交失败后,会根据重启策略重启,并调用initializeState方法恢复先前最近的一个state,继续往kafka写数据,提交事务,再次提交事务时,就不是调用commit方法了,而是调用FlinkKafkaProducer中的recoverAndCommit方法(这块也可能是preCommit方法,自己还没完全看懂源码),先恢复数据再commit事务,源码如下

@Override
    protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) {
        if (transaction.isTransactional()) {
            try (
                FlinkKafkaInternalProducer<byte[], byte[]> producer =
                    initTransactionalProducer(transaction.transactionalId, false)) {
                producer.resumeTransaction(transaction.producerId, transaction.epoch);
                producer.commitTransaction();
            } catch (InvalidTxnStateException | ProducerFencedException ex) {
                // That means we have committed this transaction before.
                LOG.warn("Encountered error {} while recovering transaction {}. " +
                        "Presumably this transaction has been already committed before",
                    ex,
                    transaction);
            }
        }
    }
View Code

注意:这里可以保证checkpoint成功,以及事务提交成功,但是没法保证它俩在一起同时成功。但这也没关系,就算checkpoint成功了,事务没成功也没关系。事务没成功会回滚,它会从statebackend中恢复数据,然后再向kafka中写数据,提交事务。

2 自定义两阶段提交sink实例

  自定义两阶段提交sink,其面向的存储系统一定要支持事务,比如mysq,0.11版以后的kafka。简单来说,自定义两阶段提交sink就是继承TwoPhaseCommitSinkFunction类,然后重写里面的方法,具体见下面的例子

MySQL分两阶段提交的Sink

druid连接池

package cn._51doit.flink.day11;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

public class DruidConnectionPool {

    private transient static DataSource dataSource = null;

    private transient static Properties props = new Properties();

    static {

        props.put("driverClassName", "com.mysql.jdbc.Driver");
        props.put("url", "jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8");
        props.put("username", "root");
        props.put("password", "123456");
        try {
            dataSource = DruidDataSourceFactory.createDataSource(props);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private DruidConnectionPool() {
    }

    public static Connection getConnection() throws SQLException {
        return dataSource.getConnection();
    }
}
View Code

MySqlTwoPhaseCommitSinkFunction

package cn._51doit.flink.day11;


import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, MySqlTwoPhaseCommitSink.ConnectionState, Void> {


    public MySqlTwoPhaseCommitSink() {
        super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
    }

    @Override
    protected MySqlTwoPhaseCommitSink.ConnectionState beginTransaction() throws Exception {

        System.out.println("=====> beginTransaction... ");
        //Class.forName("com.mysql.jdbc.Driver");
        //Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8", "root", "123456");
        Connection connection = DruidConnectionPool.getConnection();
        connection.setAutoCommit(false);
        return new ConnectionState(connection);

    }


    @Override
    protected void invoke(MySqlTwoPhaseCommitSink.ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception {
        Connection connection = connectionState.connection;
        PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?");
        pstm.setString(1, value.f0);
        pstm.setInt(2, value.f1);
        pstm.setInt(3, value.f1);
        pstm.executeUpdate();
        pstm.close();

    }

    @Override
    protected void preCommit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) throws Exception {
        System.out.println("=====> preCommit... " + connectionState);
    }

    @Override
    protected void commit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) {
        System.out.println("=====> commit... ");
        Connection connection = connectionState.connection;
        try {
            connection.commit();
            connection.close();
        } catch (SQLException e) {
            throw new RuntimeException("提交事物异常");
        }
    }

    @Override
    protected void abort(MySqlTwoPhaseCommitSink.ConnectionState connectionState) {
        System.out.println("=====> abort... ");
        Connection connection = connectionState.connection;
        try {
            connection.rollback();
            connection.close();
        } catch (SQLException e) {
            throw new RuntimeException("回滚事物异常");
        }
    }

    static class ConnectionState {

        private final transient Connection connection;

        ConnectionState(Connection connection) {
            this.connection = connection;
        }

    }


}
View Code

 3 将数据写入Hbase

  使用hbase的幂等性结合at least Once(flink中state能恢复,在两次checkpoint间可能会有重复读取数据的情况)实现精确一次性语义

HBaseUtil

package cn._51doit.flink.day11;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

/**
 * Hbase的工具类,用来创建Hbase的Connection
 */
public class HBaseUtil {
    /**
     * @param zkQuorum zookeeper地址,多个要用逗号分隔
     * @param port     zookeeper端口号
     * @return
     */
    public static Connection getConnection(String zkQuorum, int port) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", zkQuorum);
        conf.set("hbase.zookeeper.property.clientPort", port + "");
        Connection connection = ConnectionFactory.createConnection(conf);
        return connection;
    }
}
View Code

MyHbaseSink

package cn._51doit.flink.day11;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;

import java.util.ArrayList;
import java.util.List;

public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> {

    private transient  Connection connection;

    private transient Integer maxSize = 1000;

    private transient Long delayTime = 5000L;

    private transient Long lastInvokeTime;

    private transient List<Put> puts = new ArrayList<>(maxSize);

    public MyHbaseSink() {}

    public MyHbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

       ParameterTool params = (ParameterTool) getRuntimeContext()
               .getExecutionConfig().getGlobalJobParameters();

        //创建一个Hbase的连接
        connection = HBaseUtil.getConnection(
                params.getRequired("hbase.zookeeper.quorum"),
                params.getInt("hbase.zookeeper.property.clientPort", 2181)
        );

        lastInvokeTime = System.currentTimeMillis();
    }

    @Override
    public void invoke(Tuple2<String, Double> value, Context context) throws Exception {

        String rk = value.f0;
        Put put = new Put(rk.getBytes());
        put.addColumn("data".getBytes(), "order".getBytes(), value.f1.toString().getBytes());

        puts.add(put);


        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();

        //加到一个集合中
        if(puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {

            //获取一个HbaseTable
            Table table = connection.getTable(TableName.valueOf("myorder"));

            table.put(puts);

            puts.clear();

            lastInvokeTime = currentTime;

            table.close();
        }

    }

    @Override
    public void close() throws Exception {
        connection.close();
    }
}
View Code

 4 ProtoBuf

  protoBuf是一种序列化机制,数据存储还是二进制,其特点是序列化、反序列化快,占用空间小(相比json而言,是它的1/3)、跨平台、跨语言。

4.1 protobuf的使用测试

(1)创建一个maven工程

(2)导入pom依赖,具体内容见下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>protobuf-bean</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.7.1</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>3.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>

                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>

</project>
View Code

(3)在main目录下创建一个proto文件夹,在这个文件夹下编辑相应的xxx.proto文件,具体如下

syntax = "proto3";
option java_package = "cn._51doit.proto";
option java_outer_classname = "OrderProto";

message Order {
    int32 id = 1;
    string time = 2;
    double money  = 3;
}
View Code

(4)在maven的plugins中会有个protobuf插件,点击里面的protobuf.compile,即可在项目中的target目录下生成相应的protobuf bean文件(支持多种语言的schema信息)

(5)将得到的proto bean移到自己想要的目录中即可

 此测试就是将json数据转成protoBuf bean格式数据,然后在将其序列化输出,以及反序列化至bean输出

OrderProtoTest

package cn._51doit.test;

import cn._51doit.proto.OrderProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;

public class OrderProtoTest {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        String json = "{"id": 100, "time": "2020-07-01", "money": 66.66}";

        //使用工具类生成一个类
        OrderProto.Order.Builder bean = OrderProto.Order.newBuilder();

        //将数据拷贝的bean中
        JsonFormat.parser().merge(json, bean);

        bean.setId(666);
        bean.setTime("2019-10-18");
        bean.setMoney(888.88);
        //序列化转成二进制
        //bean -> byte数组
        byte[] bytes = bean.build().toByteArray();

        System.out.println("二进制:" + bytes);

        //反序列化
        //二进制数组转成bean
        OrderProto.Order order = OrderProto.Order.parseFrom(bytes);
        System.out.println("对象格式:" + order);
    }
}
View Code

4.2 将数据以ProtoBuf的二进制形式发送到Kafka

DataToKafka
package cn._51doit.test;

import cn._51doit.proto.DataBeanProto;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class DataToKafka {
    public static void main(String[] args) {
        // 1 配置参数
        Properties props = new Properties();
        //连接kafka节点
        props.setProperty("bootstrap.servers", "feng05:9092,feng06:9092,feng07:9092");
        props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        String topic = "dataproto";

        // 2 kafka的生产者
        KafkaProducer<String,  byte[]> producer = new KafkaProducer<String,  byte[]>(props);
        DataBeanProto.DataBean.Builder bean = DataBeanProto.DataBean.newBuilder();
        DataBeanProto.DataBeans.Builder list = DataBeanProto.DataBeans.newBuilder();

        for (int i = 1; i <= 100; i++) {
            //往bean中设置属性
            bean.setId(i);
            bean.setTitle("doit-" + i);
            bean.setUrl("www.51doit.cn");
            //将bean追加到list中
            list.addDataBean(bean);
            //清空原来分组的数据
            bean.clear();

            if(list.getDataBeanCount() == 10) {
                //将beans的集合转成protobuf的二进制
                byte[] bytes = list.build().toByteArray();
                ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes);
                producer.send(record); //一次发送10条
                producer.flush();
                list.clear();
            }
        }
        System.out.println("message send success");
        // 释放资源
        producer.close();
    }

}
View Code

4.3  Flume的KafkaChannel整合kafka序列化器

  需求:(1)在kafka中定义序列化器,在数据写入kafka前,将之转成对应的二进制存入kafka

     (2)Flink从Kafka中拉取刚存入相应格式的二进制数据,转成ProtoBuf的Bean

(1)kafka序列化器的实现

  大致思路就是首先获取一个protoBuf bean,然后定义一个序列化器,实现一个Serializer接口,在里面重写serialize方法,具体逻辑见下面代码。将该代码打包,放到flume的lib文件夹中,注意需要将flume的lib中protobuf-java-2.5.0.jar注释或者删除掉。

KafkaProtoBufSerializer

package cn._51doit.test;


import cn._51doit.proto.UserProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class KafkaProtoBufSerializer implements Serializer<byte[]> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, byte[] data) {
        // 将source传给channel的数据转成ProtoBuf的二进制
        //line是一个json
        String line = new String(data);
        UserProto.User.Builder bean = UserProto.User.newBuilder();
        //使用工具类将JSON的数据的数据set到bean中
        try {
            JsonFormat.parser().merge(line, bean);
        } catch (InvalidProtocolBufferException e) {
            return null;
        }
        return bean.build().toByteArray(); //返回的是ProtoBuf的二进制
    }

    @Override
    public byte[] serialize(String topic, Headers headers, byte[] data) {
        return new byte[0];
    }

    @Override
    public void close() {

    }
}
View Code

(2)Flink的Kafka反序列化器的实现

注意,此处除了要设置反序列化,即将kafka中确定topic中的protoBuf格式的二进制数据序列化成protoBuf的bean,还要指定bean的序列化规则(注册自定义的序列化类),这样flink处理该数据时才能进行网络传输

 DataBeanProto(bean,跨语言)

 使用4.1方法生成

DataBeansDeserializer反序列化器

package cn._51doit.flink.day11;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

/**
 * 自定义的Flink反序列化器
 */
public class DataBeansDeserializer implements DeserializationSchema<DataBeanProto.DataBeans> {

    //反序列化
    @Override
    public DataBeanProto.DataBeans deserialize(byte[] message) throws IOException {
        return DataBeanProto.DataBeans.parseFrom(message);
    }


    @Override
    public boolean isEndOfStream(DataBeanProto.DataBeans nextElement) {
        return false;
    }

    @Override
    public TypeInformation<DataBeanProto.DataBeans> getProducedType() {
        return TypeInformation.of(DataBeanProto.DataBeans.class);
    }
}
View Code

PBSerializer序列化器

package cn._51doit.flink.day11;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.protobuf.Message;

import java.lang.reflect.Method;
import java.util.HashMap;

public class PBSerializer extends Serializer<Message> {

        /* This cache never clears, but only scales like the number of
         * classes in play, which should not be very large.
         * We can replace with a LRU if we start to see any issues.
         */
        final protected HashMap<Class, Method> methodCache = new HashMap<Class, Method>();

        /**
         * This is slow, so we should cache to avoid killing perf:
         * See: http://www.jguru.com/faq/view.jsp?EID=246569
         */
        protected Method getParse(Class cls) throws Exception {
            Method meth = methodCache.get(cls);
            if (null == meth) {
                meth = cls.getMethod("parseFrom", new Class[]{ byte[].class });
                methodCache.put(cls, meth);
            }
            return meth;
        }

        //序列化
        @Override
        public void write(Kryo kryo, Output output, Message mes) {
            byte[] ser = mes.toByteArray();
            output.writeInt(ser.length, true);
            output.writeBytes(ser);
        }

        //反序列化
        @Override
        public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
            try {
                int size = input.readInt(true);
                byte[] barr = new byte[size];
                input.readBytes(barr);
                return (Message)getParse(pbClass).invoke(null, barr);
            } catch (Exception e) {
                throw new RuntimeException("Could not create " + pbClass, e);
            }
        }
    }
View Code

测试类

ProtoBufDemo

package cn._51doit.flink.day11;

import cn._51doit.flink.day10.FlinkUtilsV2;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.util.Collector;

public class ProtoBufDemo {

    public static void main(String[] args) throws Exception{

        ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);

        DataStream<DataBeanProto.DataBeans> dataBeansStream = FlinkUtilsV2.createKafkaDataStream(parameters, "dataproto", "gid", DataBeansDeserializer.class);
        //注册自定义的序列化类
        FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBeans.class, PBSerializer.class);
        FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBean.class, PBSerializer.class);

        SingleOutputStreamOperator<DataBeanProto.DataBean> dataBeanStream = dataBeansStream.flatMap(
                new FlatMapFunction<DataBeanProto.DataBeans, DataBeanProto.DataBean>() {
                    @Override
                    public void flatMap(DataBeanProto.DataBeans list, Collector<DataBeanProto.DataBean> out) throws Exception {

                        for (DataBeanProto.DataBean dataBean : list.getDataBeanList()) {
                            out.collect(dataBean);
                        }
                    }
                });

        dataBeanStream.print();

        FlinkUtilsV2.getEnv().execute();

    }
}
View Code
 
原文地址:https://www.cnblogs.com/jj1106/p/13214614.html