解决SpringBatch/Cloud Task的SafeMode下的报错问题

问题描述

一般公司都有DBA,DBA极有可能开启了Safe mode,也就是不支持不带索引条件过滤的update操作。
而Spring Batch /Cloud Task就有一张表 JOB_SEQ或者 TASK_SEQ的表,只有一条数据,也无法完成update操作。

Could not increment ID for BATCH_JOB_SEQ sequence table; nested exception is java.sql.SQLException:
You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column

解决方案

如果是DBA提供的数据库,那么无论怎么换数据库都无法解决。除非自己提供一个开发者自建的私库(不在DBA管理中的)。
更好的解决方案是,替换Spring JDBC的MySQLMaxValueIncrementer

这个类由 DefaultDataFieldMaxValueIncrementerFactory 创建:

//DefaultDataFieldMaxValueIncrementerFactory.java
@Override
	public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
		DatabaseType databaseType = DatabaseType.valueOf(incrementerType.toUpperCase());

		if (databaseType == DB2 || databaseType == DB2AS400) {
			return new DB2SequenceMaxValueIncrementer(dataSource, incrementerName);
		}
		else if (databaseType == DB2ZOS) {
			return new DB2MainframeSequenceMaxValueIncrementer(dataSource, incrementerName);
		}
		else if (databaseType == DERBY) {
			return new DerbyMaxValueIncrementer(dataSource, incrementerName, incrementerColumnName);
		}
		else if (databaseType == HSQL) {
			return new HsqlMaxValueIncrementer(dataSource, incrementerName, incrementerColumnName);
		}
		else if (databaseType == H2) {
			return new H2SequenceMaxValueIncrementer(dataSource, incrementerName);
		}
		else if (databaseType == MYSQL) {
			MySQLMaxValueIncrementer mySQLMaxValueIncrementer = new MySQLMaxValueIncrementer(dataSource, incrementerName, incrementerColumnName);
			mySQLMaxValueIncrementer.setUseNewConnection(true);
			return mySQLMaxValueIncrementer;
		}
....
}

那么就要替换这个DefaultDataFieldMaxValueIncrementerFactory,代码如下:

package io.github.slankka.springbatch.safemode.patch;

import org.springframework.batch.item.database.support.DefaultDataFieldMaxValueIncrementerFactory;
import org.springframework.batch.support.DatabaseType;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;

import javax.sql.DataSource;

/**
 * project: springbatch safemode patch
 * <br/>To prevent error: <br/>
 * <code>
 * Could not increment ID for BATCH_JOB_SEQ sequence table; <br/>
 * nested exception is java.sql.SQLException: <br/>
 * You are using safe update mode and you tried to update a table without a WHERE that uses a KEY column <br/>
 * </code>
 *
 * @author slankka on 2019/8/30.
 */
public class SafeModeMysqlIncreamentFactory extends DefaultDataFieldMaxValueIncrementerFactory {

    private DataSource dataSource;
    private String incrementerColumnName = "ID";

    public SafeModeMysqlIncreamentFactory(DataSource dataSource) {
        super(dataSource);
        this.dataSource = dataSource;
    }

    @Override
    public void setIncrementerColumnName(String incrementerColumnName) {
        super.setIncrementerColumnName(incrementerColumnName);
        this.incrementerColumnName = incrementerColumnName;
    }

    @Override
    public DataFieldMaxValueIncrementer getIncrementer(String incrementerType, String incrementerName) {
        DatabaseType databaseType = DatabaseType.valueOf(incrementerType.toUpperCase());
        if (databaseType == DatabaseType.MYSQL) {
            SafeModeMysqlMaxValueIncreamenter mySQLMaxValueIncrementer = new SafeModeMysqlMaxValueIncreamenter(dataSource, incrementerName, incrementerColumnName);
            mySQLMaxValueIncrementer.setUseNewConnection(true);
            return mySQLMaxValueIncrementer;
        }
        return super.getIncrementer(incrementerType, incrementerName);
    }
}

这里提供了SafeModeMysqlMaxValueIncreamenter 类,这个类就是解决问题的关键:

直接复制MySQLMaxValueIncrementer的代码,修改 stmt.executeUpdate 的SQL语句,尾部追加 where columnName > 0。
这样就能骗过 SafeMode检查。另外,如果这个字段不是主键,把他设置为主键即可
需要设置ID为主键的表有三个:
alter table batch_job_execution_seq ADD PRIMARY KEY('ID');
alter table batch_job_seq ADD PRIMARY KEY('ID');
alter table batch_step_execution_seq ADD PRIMARY KEY('ID');

 @Override
    protected synchronized long getNextKey() throws DataAccessException {
        if (this.maxId == this.nextId) {
            /*
             * If useNewConnection is true, then we obtain a non-managed connection so our modifications
             * are handled in a separate transaction. If it is false, then we use the current transaction's
             * connection relying on the use of a non-transactional storage engine like MYISAM for the
             * incrementer table. We also use straight JDBC code because we need to make sure that the insert
             * and select are performed on the same connection (otherwise we can't be sure that last_insert_id()
             * returned the correct value).
             */
            Connection con = null;
            Statement stmt = null;
            boolean mustRestoreAutoCommit = false;
            try {
                if (this.useNewConnection) {
                    con = getDataSource().getConnection();
                    if (con.getAutoCommit()) {
                        mustRestoreAutoCommit = true;
                        con.setAutoCommit(false);
                    }
                } else {
                    con = DataSourceUtils.getConnection(getDataSource());
                }
                stmt = con.createStatement();
                if (!this.useNewConnection) {
                    DataSourceUtils.applyTransactionTimeout(stmt, getDataSource());
                }
                // Increment the sequence column...
                String columnName = getColumnName();
                try {
                    stmt.executeUpdate("update " + getIncrementerName() + " set " + columnName +
                            " = last_insert_id(" + columnName + " + " + getCacheSize() + ") where " +  columnName + " > 0");
                } catch (SQLException ex) {
                    throw new DataAccessResourceFailureException("Could not increment " + columnName + " for " +
                            getIncrementerName() + " sequence table", ex);
                }
                // Retrieve the new max of the sequence column...
                ResultSet rs = stmt.executeQuery(VALUE_SQL);
                try {
                    if (!rs.next()) {
                        throw new DataAccessResourceFailureException("last_insert_id() failed after executing an update");
                    }
                    this.maxId = rs.getLong(1);
                } finally {
                    JdbcUtils.closeResultSet(rs);
                }
                this.nextId = this.maxId - getCacheSize() + 1;
            } catch (SQLException ex) {
                throw new DataAccessResourceFailureException("Could not obtain last_insert_id()", ex);
            } finally {
                JdbcUtils.closeStatement(stmt);
                if (con != null) {
                    if (this.useNewConnection) {
                        try {
                            con.commit();
                            if (mustRestoreAutoCommit) {
                                con.setAutoCommit(true);
                            }
                        } catch (SQLException ignore) {
                            throw new DataAccessResourceFailureException(
                                    "Unable to commit new sequence value changes for " + getIncrementerName());
                        }
                        JdbcUtils.closeConnection(con);
                    } else {
                        DataSourceUtils.releaseConnection(con, getDataSource());
                    }
                }
            }
        } else {
            this.nextId++;
        }
        return this.nextId;
    }

使用方法

参见 slankka/spring-batch-safemode-patch

原文地址:https://www.cnblogs.com/slankka/p/11444304.html