JDBC线程池创建与DBCP源码阅读

创建数据库连接是一个比较消耗性能的操作,同时在并发量较大的情况下创建过多的连接对服务器形成巨大的压力。对于资源的频繁分配﹑释放所造成的问题,使用连接池技术是一种比较好的解决方式。

在Java中,连接池已经有很多开源实现了,在这里使用commons-dbcp2这个包来

创建JDBC连接池

public final class JDBCUtil{
    private static DataSource myDataSource=null;
    private JDBCUtil(){
    }
    static {
        try{
            Properties pro=new Properties();
            InputStream is=JDBCUtil.class.getClassLoader().getResourceAsStream("mysqlPoolConf.properties");
            pro.load(is);
            myDataSource=BasicDataSourceFactory.createDataSource(pro);
        }
        catch(Exception e){
            e.printStackTrace();
        }
    }
    public static Connection getConnection()throws SQLException{
        return myDataSource.getConnection();
    }
    public static void close(Connection conn){
        if(conn!=null){
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

使用很简单:

(1)    创建DataSource对象实例myDataSource

(2)    通过myDataSource的getConnection()方法获得数据库连接并使用

(3)    用完后调用连接close()方法来归还连接。

当然,这人我们不禁心存疑问,调用连接close()方法不是不是将连接关闭了吗?那连接回到连接池又是怎么实现的?一起来跟着源码看看连接池的实现过程:

首先是连接池配置文件mysqlPoolConf.properties:

为连接池的一些属性配置,这里只列举了一些常用的:

#连接设置
driverClassName=com.mysql.jdbc.Driver
url=jdbc:mysql://172.16.20.242:3306/test?characterEncoding=utf8
username=root
password=123456
#<!-- 初始化连接 -->
initialSize=20
#<!-- 最大空闲连接 -->
maxIdle=20
#<!-- 最小空闲连接 -->
minIdle=5
#最大连接数量
maxActive=100
#是否在自动回收超时连接的时候打印连接的超时错误
logAbandoned=true
#是否自动回收超时连接
removeAbandoned=true
#超时时间(以秒数为单位)
#设置超时时间有一个要注意的地方,超时时间=现在的时间-程序中创建Connection的时间,如果 maxActive比较大,比如超过100,那么removeAbandonedTimeout可以设置长一点比如180,也就是三分钟无响应的连接进行回收,当然应用的不同设置长度也不同。
removeAbandonedTimeout=180
#<!-- 超时等待时间以毫秒为单位 -->
#maxWait代表当Connection用尽了,多久之后进行回收丢失连接
maxWait=1000

跟随源码看看连接池的实现逻辑:

BasicDataSourceFactory为一个工厂类,在调用它的方法createDataSource(Properties properties)时,通过createDataSource()创建了一个PoolingDataSource对象的实例

createDataSource()源码如下,我在源码中加了一部分注释:

protected DataSource createDataSource() throws SQLException {
    if (closed) {
        throw new SQLException("Data source is closed");
    }
    // Return the pool if we have already created it
    // This is double-checked locking. This is safe since dataSource is
    // volatile and the code is targeted at Java 5 onwards.
    if (dataSource != null) {//检查dataSource是否已经关闭
        return dataSource;
    }
    synchronized (this) {//同步代码块,避免多个线程来同时创建连接池
        //如果dataSource已经创建,直接返回, 由于同步代码块需要获得锁,
            //可能其他线程已经创建了dataSource,再释放锁,
        //因此,在获得锁之后还需要再判断一下,这一点在平时编程中容易忽略。
        if (dataSource != null) {
            return dataSource;
        }
        jmxRegister();//jmx注册,不影响整体流程
        // create factory which returns raw physical connections
        //调用createConnectionFactory()创建JDBC连接工厂driverConnectionFactory,这个工厂使用数据库驱动来创建最底层的JDBC连接(即物理连接)
        ConnectionFactory driverConnectionFactory = createConnectionFactory();
        // Set up the poolable connection factory
        //调用createConnectionPool()创建数据源使用的连接池,连接池顾名思义就是缓存JDBC连接的地方。
        boolean success = false;
        PoolableConnectionFactory poolableConnectionFactory;//连接池工厂,保存了一系列的状态信息 和 JDBC连接工厂
        try {
            poolableConnectionFactory = createPoolableConnectionFactory(driverConnectionFactory);
            poolableConnectionFactory.setPoolStatements(poolPreparedStatements);
            poolableConnectionFactory.setMaxOpenPrepatedStatements(maxOpenPreparedStatements);
            success = true;
        } catch (SQLException se) {
            throw se;
        } catch (RuntimeException rte) {
            throw rte;
        } catch (Exception ex) {
            throw new SQLException("Error creating connection factory", ex);
        }
        if (success) {
            // create a pool for our connections
            //创建 GenericObjectPool<PoolableConnection>类的实例
            //GenericObjectPool是apach-commons-pool包的一个通用对象池类,其构造函数参数是一个对象工厂实例
            createConnectionPool(poolableConnectionFactory);
        }
        // Create the pooling data source to manage connections
        DataSource newDataSource; 
        success = false;
        try {
            newDataSource = createDataSourceInstance();//创建了一个PoolingDataSource的实例
            newDataSource.setLogWriter(logWriter);
            success = true;
        } catch (SQLException se) {
            throw se;
        } catch (RuntimeException rte) {
            throw rte;
        } catch (Exception ex) {
            throw new SQLException("Error creating datasource", ex);
        } finally {
            if (!success) {
                closeConnectionPool();
            }
        }
        // If initialSize > 0, preload the pool
        try {
            for (int i = 0 ; i < initialSize ; i++) {
                connectionPool.addObject();//初始化initialSize个JDBC连接
            }
        } catch (Exception e) {
            closeConnectionPool();
            throw new SQLException("Error preloading the connection pool", e);
        }
        // If timeBetweenEvictionRunsMillis > 0, start the pool's evictor task
        startPoolMaintenance();
        dataSource = newDataSource;
        return dataSource;
    }
}

在这一步创建了一个池化连接类工厂的实例,真正的连接是由这个工厂创建并放入连接池,GenericObjectPool是连接池的具体管理者,它是在commons-pool2这个包中实现的一个通用的对象池,GenericObjectPool负责连接池的管理(包括初始化连接,将多余的空闲连接关闭等)。之后,在使用需要使用数据库的Connection的时候,通过PoolingDataSource的getConnection() 方法获得一个连接,getConnection()方法实现如下:

public Connection getConnection() throws SQLException {
    try {
        //_pool是在BasicDataSource创建的GenericObjectPool<PoolableConnection> connectionPool这个实例
        C conn = _pool.borrowObject();//从JDBC连接池中获得一个JDBC连接
        if (conn == null) {
            return null;
        }
        //PoolGuardConnectionWrapper是DBCP对JDBC Connection这个类的一个封装
        return new PoolGuardConnectionWrapper<>(conn);
    } catch(SQLException e) {
        throw e;
    } catch(NoSuchElementException e) {
        throw new SQLException("Cannot get a connection, pool error " + e.getMessage(), e);
    } catch(RuntimeException e) {
        throw e;
    } catch(Exception e) {
        throw new SQLException("Cannot get a connection, general error", e);
    }
}

_pool.borrowObject()得到是PoolableConnection对象的实例,其继承自DelegatingConnection。

PoolGuardConnectionWrapper继承了DelegatingConnection这个类且对父类并没有做太多的改动。DelegatingConnection实现了java.sql.Connection接口,createStatement(),prepareStatement(),commit(),rollback()这些方法和我们以往

通过DriverManager.getConnection()得到的Connection使用方法是一致的。

重点在调用colse()方法时,并不是真正的关闭了连接,而是对连接做了状态清理后将

通过GenericObjectPool的returnObject方法将其归还至连接池了,看看源码就明白了。

先看看DelegatingConnection这个类实现的close方法,主要是做了Statement和ResultSet的清理:

public void close() throws SQLException {
    if (!_closed) {
        closeInternal();
    }
}
protected final void closeInternal() throws SQLException {
    try {
        passivate();
    } finally {
        if (_conn != null) {
            try {
                _conn.close();
            } finally {
                _closed = true;
            }
        } else {
            _closed = true;
        }
    }
}
protected void passivate() throws SQLException {
    // The JDBC spec requires that a Connection close any open
    // Statement's when it is closed.
    // DBCP-288. Not all the traced objects will be statements
    //关闭  Statement and ResultSet
    List<AbandonedTrace> traces = getTrace();
    if(traces != null && traces.size() > 0) {
        Iterator<AbandonedTrace> traceIter = traces.iterator();
        while (traceIter.hasNext()) {
            Object trace = traceIter.next();
            if (trace instanceof Statement) {
                ((Statement) trace).close();
            } else if (trace instanceof ResultSet) {
                // DBCP-265: Need to close the result sets that are
                // generated via DatabaseMetaData
                ((ResultSet) trace).close();
            }
        }
        clearTrace();
    }
    setLastUsed(0);
}

PoolableConnection对close()方法进行了复写:

/**
 * Returns me to my pool.
 */
 @Override
public synchronized void close() throws SQLException {
    if (isClosedInternal()) {
        // already closed
        return;
    }
    boolean isUnderlyingConectionClosed;
    try {
        isUnderlyingConectionClosed = getDelegateInternal().isClosed();
    } catch (SQLException e) {
        try {
            _pool.invalidateObject(this);
        } catch(IllegalStateException ise) {
            passivate();//在这个方法内部还是调用的父类DelegatingConnection的passivate()方法
            getInnermostDelegate().close();
        } catch (Exception ie) {
            // DO NOTHING the original exception will be rethrown
        }
        throw new SQLException("Cannot close connection (isClosed check failed)", e);
    }
     
    if (isUnderlyingConectionClosed) {
        try {
            _pool.invalidateObject(this);
        } catch(IllegalStateException e) {
            passivate();
            getInnermostDelegate().close();
        } catch (Exception e) {
            throw new SQLException("Cannot close connection (invalidating pooled object failed)", e);
        }
    } else {
        try {
            _pool.returnObject(this);
        } catch(IllegalStateException e) {
            passivate();
            getInnermostDelegate().close();
        } catch(SQLException e) {
            throw e;
        } catch(RuntimeException e) {
            throw e;
        } catch(Exception e) {
            throw new SQLException("Cannot close connection (return to pool failed)", e);
        }
    }
}
@Override
protected void passivate()throws SQLException {
    super.passivate();
    setClosedInternal(true);
}

基本逻辑是调用父类的passivate()方法做状态清理,然后将连接归还给连接池。因此,回到本篇开始的地方,调用close()方法是不会关闭该连接的。

 

最后,DBCP这个包大致的框架结构如下


主要有以下几点:

1、 实现池化连接对象PoolableConnection,这个即是我们操作数据的连接对象

2、 池化连接对象工厂类PoolableConnectionFactory,池化连接对象的创建工厂

3、 BasicDataSource工厂类BasicDataSourceFactory,用以创建PoolingDataSource类的实例

4、 PoolingDataSource使用PoolableConnectionFactory创建池化连接,并使用GenericObjectPool作为池对象的管理者。

5、 由于一个connection可以对应多个statement,PoolableConnection内部使用KeyedObjectPool来管理这些statement。

个人理解,不足或者错误之处敬请指出~~~

原文地址:https://www.cnblogs.com/cl1024cl/p/6205015.html