大数据场景下数据异构之 Mysql实时写入HBase(借助canal kafka SparkStreaming)

 

背景:公司线下ETC机房有个Mycat集群,供订单系统使用,现需要进行数据异构将Mysql数据(近)实时写入另一套数据库用作读请求和数据归档用
技术选型:binlog解析工具:阿里开源的canal  消息中间件:kafka  流式框架:SparkStreaming
上代码
canal解析mysqlbinlog 实时写入kafka:

package kafka;
 
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.lang.SystemUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
 
 
public class AbstractCanalClient {
 
    protected final static Logger logger = LoggerFactory
            .getLogger(AbstractCanalClient.class);
    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    protected volatile boolean running = false;
    protected Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
 
        public void uncaughtException(Thread t, Throwable e) {
            logger.error("parse events has an error", e);
        }
    };
    protected Thread thread = null;
    protected CanalConnector connector;
    protected static String context_format = null;
    protected static String row_format = null;
    protected static String transaction_format = null;
    protected String destination;
    protected Producer<String, String> kafkaProducer = null;
    protected String topic;
    protected String table;
 
    static {
        context_format = SEP
                + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}"
                + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************"
                + SEP;
 
        row_format = SEP
                + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
                + SEP;
 
        transaction_format = SEP
                + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms"
                + SEP;
 
    }
 
    public AbstractCanalClient(String destination) {
        this(destination, null);
    }
 
    public AbstractCanalClient(String destination, CanalConnector connector) {
        this(destination, connector, null);
    }
 
    public AbstractCanalClient(String destination, CanalConnector connector,
                               Producer<String, String> kafkaProducer) {
        this.connector = connector;
        this.destination = destination;
        this.kafkaProducer = kafkaProducer;
    }
 
    protected void start() {
        Assert.notNull(connector, "connector is null");
        Assert.notNull(kafkaProducer, "Kafka producer configuration is null");
        Assert.notNull(topic, "kafaka topic is null");
        Assert.notNull(table,"table is null");
        thread = new Thread(new Runnable() {
 
            public void run() {
                process();
            }
        });
 
        thread.setUncaughtExceptionHandler(handler);
        thread.start();
        running = true;
    }
 
    protected void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }
 
        kafkaProducer.close();
        MDC.remove("destination");
    }
 
    protected void process() {
        int batchSize = 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe("databaseName\.tableName");
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    try {
                        int size = message.getEntries().size();
                        if (batchId == -1 || size == 0) {
                            try {
                                Thread.sleep(100);
                            } catch (InterruptedException e) {
                            }
                        } else {
 
                            kafkaEntry(message.getEntries());
 
                        }
 
                        connector.ack(batchId); // 提交确认
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                    }
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }
 
    private void printSummary(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }
 
        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(
                    message.getEntries().size() - 1));
        }
 
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[]{batchId, size, memsize,
                format.format(new Date()), startPosition, endPosition});
    }
 
    protected String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        return entry.getHeader().getLogfileName() + ":"
                + entry.getHeader().getLogfileOffset() + ":"
                + entry.getHeader().getExecuteTime() + "("
                + format.format(date) + ")";
    }
 
    private void kafkaEntry(List<Entry> entrys) throws InterruptedException, ExecutionException {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
 
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException(
                        "ERROR ## parser of eromanga-event has an error , data:"
                                + entry.toString(), e);
            }
 
            String logfileName = entry.getHeader().getLogfileName();
            Long logfileOffset = entry.getHeader().getLogfileOffset();
            String dbName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
 
            EventType eventType = rowChage.getEventType();
            if (eventType == EventType.DELETE || eventType == EventType.UPDATE
                    || eventType == EventType.INSERT) {
                for (RowData rowData : rowChage.getRowDatasList()) {
                    String tmpstr = "";
                    if (eventType == EventType.DELETE) {
                        tmpstr = getDeleteJson(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        tmpstr = getInsertJson(rowData.getAfterColumnsList());
                    } else if (eventType == EventType.UPDATE) {
                        tmpstr = getUpdateJson(rowData.getBeforeColumnsList(),
                                rowData.getAfterColumnsList());
                    } else {
                        continue;
                    }
                    logger.info(this.topic+tmpstr);
                    kafkaProducer.send(
                                new ProducerRecord<String, String>(this.topic,
                                        tmpstr)).get();
                }
            }
        }
    }
 
    private JSONObject columnToJson(List<Column> columns) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        return json;
    }
 
    private String getInsertJson(List<Column> columns) {
        JSONObject json = new JSONObject();
        json.put("type", "insert");
        json.put("data", this.columnToJson(columns));
        return json.toJSONString();
    }
 
    private String getUpdateJson(List<Column> befcolumns, List<Column> columns) {
        JSONObject json = new JSONObject();
        json.put("type", "update");
        json.put("data", this.columnToJson(columns));
        return json.toJSONString();
    }
 
    private String getDeleteJson(List<Column> columns) {
        JSONObject json = new JSONObject();
        json.put("type", "delete");
        json.put("data", this.columnToJson(columns));
        return json.toJSONString();
    }
 
    protected void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
 
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry
                                .getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(
                                "parse event has an error , data:"
                                        + entry.toString(), e);
                    }
                    // 打印事务头信息,执行的线程id,事务耗时
                    logger.info(
                            transaction_format,
                            new Object[]{
                                    entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader()
                                            .getLogfileOffset()),
                                    String.valueOf(entry.getHeader()
                                            .getExecuteTime()),
                                    String.valueOf(delayTime)});
                    logger.info(" BEGIN ----> Thread id: {}",
                            begin.getThreadId());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(
                                "parse event has an error , data:"
                                        + entry.toString(), e);
                    }
                    // 打印事务提交信息,事务id
                    logger.info("----------------
");
                    logger.info(" END ----> transaction id: {}",
                            end.getTransactionId());
                    logger.info(
                            transaction_format,
                            new Object[]{
                                    entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader()
                                            .getLogfileOffset()),
                                    String.valueOf(entry.getHeader()
                                            .getExecuteTime()),
                                    String.valueOf(delayTime)});
                }
 
                continue;
            }
 
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException(
                            "parse event has an error , data:"
                                    + entry.toString(), e);
                }
 
                EventType eventType = rowChage.getEventType();
 
                logger.info(
                        row_format,
                        new Object[]{
                                entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader()
                                        .getLogfileOffset()),
                                entry.getHeader().getSchemaName(),
                                entry.getHeader().getTableName(),
                                eventType,
                                String.valueOf(entry.getHeader()
                                        .getExecuteTime()),
                                String.valueOf(delayTime)});
 
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }
 
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    }
 
    protected void printColumn(List<Column> columns) {
        for (Column column : columns) {
            StringBuilder builder = new StringBuilder();
            builder.append(column.getName() + " : " + column.getValue());
            builder.append("    type=" + column.getMysqlType());
            if (column.getUpdated()) {
                builder.append("    update=" + column.getUpdated());
            }
            builder.append(SEP);
            logger.info(builder.toString());
        }
    }
 
    public void setConnector(CanalConnector connector) {
        this.connector = connector;
    }
 
    public void setKafkaProducer(Producer<String, String> kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }
 
    public void setKafkaTopic(String topic) {
        this.topic = topic;
    }
 
    public void setFilterTable(String table) {
        this.table = table;
    }
}
package kafka;
 
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
 
import java.net.InetSocketAddress;
import java.util.Properties;
 
 
public class ClusterCanalClient extends AbstractCanalClient {
 
    public ClusterCanalClient(String destination) {
        super(destination);
    }
 
    public static void main(String args[]) {
        String destination = null;//"example";
        String topic = null;
//        String canalhazk = null;
        String kafka = null;
        String hostname = null;
        String table = null;
 
        if (args.length != 5) {
            logger.error("input param must : hostname destination topic kafka table" +
                    "for example: localhost example topic 192.168.0.163:9092 tablname");
            System.exit(1);
        } else {
            hostname = args[0];
            destination = args[1];
            topic = args[2];
//            canalhazk = args[2];
            kafka = args[3];
            table = args[4];
        }
 
 
        // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(
                hostname, 11111),
                destination,
                "canal",
                "canal");
//        CanalConnector connector = CanalConnectors.newClusterConnector(
//                canalhazk, destination, "userName", "passwd");
        Properties props = new Properties();
 
 
        props.put("bootstrap.servers", kafka);
        props.put("request.required.acks",1);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);//32m
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
 
        final ClusterCanalClient clientTest = new ClusterCanalClient(destination);
        clientTest.setConnector(connector);
        clientTest.setKafkaProducer(producer);
        clientTest.setKafkaTopic(topic);
        clientTest.setFilterTable(table);
        clientTest.start();
 
 
        Runtime.getRuntime().addShutdownHook(new Thread() {
 
            public void run() {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:
{}", ExceptionUtils.getFullStackTrace(e));
                } finally {
                    logger.info("## canal client is down.");
                }
            }
 
        });
    }
}

SparkStreaming 将kafka数据 写入HBase

package bcw.etl.syncdata
 
import java.util
 
import com.alibaba.fastjson.{JSON, JSONObject}
import example.utils.KafkaOffset_ZKManager
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.HasOffsetRanges
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
 
/**
  * @author zhuqitian
  *         2018-5-13
  *         Kafka to HBase
  *         create 'wms_schedule_main','info'
  *         ./kafka-topics.sh --create --topic wms_schedule_main --zookeeper ip:2181/kafka0.9 --partitions 3 --replication-factor 1
  *
  *         note:
  *
  */
object Kafka_to_HBase extends App {
 
  var logger: Logger = Logger.getLogger(Kafka_to_HBase.getClass)
  val conf = new SparkConf()
    .setAppName("Kafka_to_HBASE")
    .setMaster("local")
    .set("spark.streaming.kafka.maxRatePerPartition", "100000")
 
  val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "ip:9092",
    "auto.offset.reset" -> "smallest"
  )
 
  val topicSet = "wms_schedule_main".split(",").toSet
  val groupName = "wms_mysql_test"
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(2))
 
  val config = HBaseConfiguration.create
  config.set("hbase.zookeeper.quorum", "ip")
  config.set("hbase.zookeeper.property.clientPort", "2181")
  val conn = ConnectionFactory.createConnection(config)
  val table: Table = conn.getTable(TableName.valueOf("wms_schedule_main"))
  var puts = new util.ArrayList[Put]
 
  val DStream: InputDStream[(String, String)] = KafkaOffset_ZKManager.createMyDirectKafkaStream(
    ssc, kafkaParams, topicSet, groupName)
 
    DStream.foreachRDD((rdd, btime) => {
    if (!rdd.isEmpty()) {
 
      val startTime = System.currentTimeMillis
      println(s">>>>>>>>>>>>>>>>>>>>>>start : $startTime")
 
      val message: RDD[JSONObject] = rdd.map(line => JSON.parseObject(line._2))
        .map(json => json.getJSONObject("data"))
          .filter(x => x.getString("biid") != null)
 
      val jsondata = message.map(jsondata => {
          val rowKey = jsondata.getString("biid").reverse + jsondata.getString("chdt")
          val put = new Put(rowKey.getBytes)//md5
          val columns = jsondata.keySet().toArray()
 
          for (key <- columns) {
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(key.toString), Bytes.toBytes(jsondata.getString(key.toString)))
          }
          puts.add(put)
      }).count()
      println(" puts size : " + puts.size())
      table.put(puts)
 
      val endTime = System.currentTimeMillis
      println(">>>>>>>>>>>>>>>>>>>>>>this batch took " + (endTime - startTime) + " milliseconds. data size is " + jsondata)
      println("##################################" + btime)
 
      puts.clear()
      println("puts clear after size : " + puts.size())
 
    }
    KafkaOffset_ZKManager.storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupName)
  })
 
  ssc.start()
  ssc.awaitTermination()
}

总结:1.解析mysql binlog 封装成JSON实时写kafka  2.SparkStreaming 消费kafka数据 解析JSON  3.遍历JSON中key作为字段名跟上value写入HBase(注意HBase中rowkey的设计)

延伸:消费一次且仅一次的语义实现?幂等写入?sql on HBase? HBase split重操作耗时导致请求响应延迟?


原文地址:https://www.cnblogs.com/huanghanyu/p/13500838.html