The last packet sent successfully to the server was 24,211,129 milliseconds ago. is longer than the server configured value of 'wait_timeout'....

flink连接mysql报错连接超时

报错内容:mysql连接超时

 报错原因分析:

  flink在流式程序在向mysql写入数据的时候,由于已经跟mysql建立的连接长时间没有想mysql中写入数据,查过了mysql的最大心跳时间(mysql默认是8小时),此时mysql会将连接关闭,导致再次写入数据的时候会报错误。

解决方案

可以在程序中新增一个定时器,每隔一定时间(小于mysql的最大心跳8小时)去访问一下mysql数据库,保证与mysql的连接处于活动状态

package com.write.flink.sink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;

/**
 * @Author Guo
 * @Description 唤醒数据库连接
 * @Date 2021/1/20 16:31
 **/

public class DbKeepConnectionSink  extends TimerTask {

    private final Logger LOGGER = LoggerFactory.getLogger(DbKeepConnectionSink.class);
    Map<String, Connection> connMap;

    public DbKeepConnectionSink(Map<String, Connection> connMap){
        this.connMap = connMap;
    }
    @Override
    public void run() {
        LOGGER.warn("唤醒数据库连接...");
        keepDbAlive(connMap);
    }
    public void keepDbAlive(Map<String, Connection> connMap) {

        Set<String> conns = connMap.keySet();
        Connection conn = null;
        for (String connKey:conns) {
            String sql = "select 1 as t from dual limit 1";
            System.out.println("sql===> " + sql);
            conn = connMap.get(connKey);
            if (null != conn) {
                try {
                    conn.prepareStatement(sql).execute();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

然后可以在flink程序初始换时候进行调用:

   /**
     * 唤醒数据库连接,每个WAKE_UP_DB_CONNECTION小时唤醒一次
     */
    public void keepConnecting(Map<String, Connection> connMap) {
        new Timer().schedule(new DbKeepConnectionSink(connMap), new Date(), 1000 * 60 * WAKE_UP_DB_CONNECTION);
    }

上述方法是指解决问题的其中的一个思路,相信肯定还有更好的方式

原文地址:https://www.cnblogs.com/Gxiaobai/p/14305959.html