flink连接mysql报错连接超时
报错内容:mysql连接超时
报错原因分析:
flink在流式程序在向mysql写入数据的时候,由于已经跟mysql建立的连接长时间没有想mysql中写入数据,查过了mysql的最大心跳时间(mysql默认是8小时),此时mysql会将连接关闭,导致再次写入数据的时候会报错误。
解决方案
可以在程序中新增一个定时器,每隔一定时间(小于mysql的最大心跳8小时)去访问一下mysql数据库,保证与mysql的连接处于活动状态
package com.write.flink.sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
/**
* @Author Guo
* @Description 唤醒数据库连接
* @Date 2021/1/20 16:31
**/
public class DbKeepConnectionSink extends TimerTask {
private final Logger LOGGER = LoggerFactory.getLogger(DbKeepConnectionSink.class);
Map<String, Connection> connMap;
public DbKeepConnectionSink(Map<String, Connection> connMap){
this.connMap = connMap;
}
@Override
public void run() {
LOGGER.warn("唤醒数据库连接...");
keepDbAlive(connMap);
}
public void keepDbAlive(Map<String, Connection> connMap) {
Set<String> conns = connMap.keySet();
Connection conn = null;
for (String connKey:conns) {
String sql = "select 1 as t from dual limit 1";
System.out.println("sql===> " + sql);
conn = connMap.get(connKey);
if (null != conn) {
try {
conn.prepareStatement(sql).execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
然后可以在flink程序初始换时候进行调用:
/**
* 唤醒数据库连接,每个WAKE_UP_DB_CONNECTION小时唤醒一次
*/
public void keepConnecting(Map<String, Connection> connMap) {
new Timer().schedule(new DbKeepConnectionSink(connMap), new Date(), 1000 * 60 * WAKE_UP_DB_CONNECTION);
}
上述方法是指解决问题的其中的一个思路,相信肯定还有更好的方式