java 连接池的简单实现

  最近一个项目中需要自己写个连接池, 写了一个下午,挺辛苦的,但不知道会不会出问题, 所以,贴到博客上,欢迎各路大神指点

1. 配置信息:

/**
 * 
 */
package cn.mjorcen.db.bean;

import java.util.ResourceBundle;

import org.apache.log4j.Logger;

/**
 * 
 * 配置信息
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Oct 5, 2014 3:02:56 PM
 * @version 1
 */
public class Configuration {
    private ResourceBundle resource;
    private Logger logger = Logger.getLogger(getClass());
    private String driverClassName = "com.mysql.jdbc.Driver";
    private String validationQuery = "SELECT 1";
    private String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull";
    private String user = "root";
    private String password = "";
    private int initialPoolSize = 3;
    private int minPoolSize = 3;
    private int maxPoolSize = 10;
    private int maxStatements = 30;
    private int maxIdleTime = 25000;
    private int idleConnectionTestPeriod = 18000;
    private int connectionLonger = 3600000;

    public Configuration() {
        super();

    }

    public Configuration(String _properties) {
        super();
        init(_properties);
    }

    /**
     * 
     * @param _properties
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:08:54 PM
     * @version 1
     */
    private void init(String _properties) {
        resource = ResourceBundle.getBundle(_properties);
        try {
            String tmp = "";
            setDriverClassName(resource.getString("driverClassName"));
            setValidationQuery(resource.getString("validationQuery"));
            setUrl(resource.getString("jdbc_url"));
            setUser(resource.getString("jdbc_username"));
            setPassword(resource.getString("jdbc_password"));

            tmp = resource.getString("initialPoolSize");
            if (tmp != null) {
                setInitialPoolSize(Integer.parseInt(tmp));
            }
            tmp = resource.getString("minPoolSize");
            if (tmp != null) {
                setMinPoolSize(Integer.parseInt(tmp));
            }
            tmp = resource.getString("maxPoolSize");
            if (tmp != null) {
                setMaxPoolSize(Integer.parseInt(tmp));
            }
            tmp = resource.getString("maxStatements");
            if (tmp != null) {
                setMaxStatements(Integer.parseInt(tmp));
            }
            tmp = resource.getString("maxIdleTime");
            if (tmp != null) {
                setMaxIdleTime(Integer.parseInt(tmp));
            }
            tmp = resource.getString("idleConnectionTestPeriod");
            if (tmp != null) {
                setIdleConnectionTestPeriod(Integer.parseInt(tmp));
            }
            tmp = resource.getString("connectionLonger");
            if (tmp != null) {
                setConnectionLonger(Integer.parseInt(tmp));
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error(e);
        }

    }

    public ResourceBundle getResource() {
        return resource;
    }

    public void setResource(ResourceBundle resource) {
        this.resource = resource;
    }

    public String getDriverClassName() {
        return driverClassName;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public String getValidationQuery() {
        return validationQuery;
    }

    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getInitialPoolSize() {
        return initialPoolSize;
    }

    public void setInitialPoolSize(int initialPoolSize) {
        this.initialPoolSize = initialPoolSize;
    }

    public int getMinPoolSize() {
        return minPoolSize;
    }

    public void setMinPoolSize(int minPoolSize) {
        this.minPoolSize = minPoolSize;
    }

    public int getMaxPoolSize() {
        return maxPoolSize;
    }

    public void setMaxPoolSize(int maxPoolSize) {
        this.maxPoolSize = maxPoolSize;
    }

    public int getMaxStatements() {
        return maxStatements;
    }

    public void setMaxStatements(int maxStatements) {
        this.maxStatements = maxStatements;
    }

    public int getMaxIdleTime() {
        return maxIdleTime;
    }

    public void setMaxIdleTime(int maxIdleTime) {
        this.maxIdleTime = maxIdleTime;
    }

    public int getIdleConnectionTestPeriod() {
        return idleConnectionTestPeriod;
    }

    public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) {
        this.idleConnectionTestPeriod = idleConnectionTestPeriod;
    }

    public int getConnectionLonger() {
        return connectionLonger;
    }

    public void setConnectionLonger(int connectionLonger) {
        this.connectionLonger = connectionLonger;
    }

}

2. connection 的包装类, 因为mysql 一个连接连接8小时就会被mysql 干掉;所以出此下策;

/**
 * 
 */
package cn.mjorcen.db.bean;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

/**
 * 
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Oct 5, 2014 4:27:30 PM
 * @version 1
 */
public class WarpConnection {
    private Logger logger = Logger.getLogger(getClass());
    static private AtomicInteger atomicInteger = new AtomicInteger(0);
    private String name;
    private long connectionTime;
    private long lastWorkTime;
    private Connection connection;

    public long getConnectionTime() {
        return connectionTime;
    }

    public void setConnectionTime(long connectionTime) {
        this.connectionTime = connectionTime;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public static WarpConnection warp(Connection connection) {
        WarpConnection warpConnection = new WarpConnection();
        warpConnection.setConnection(connection);
        warpConnection.setConnectionTime(System.currentTimeMillis());
        warpConnection.setName("name" + atomicInteger.getAndAdd(1));
        return warpConnection;
    }

    public boolean isTimeOut(long time) {
        boolean flag = System.currentTimeMillis() - this.connectionTime >= time;
        System.out.println("name is " + this.name + " ,connectionTime is "
                + connectionTime + ", flag is " + flag + " ,time is "+time);
        return flag;
    }

    public long getLastWorkTime() {
        return lastWorkTime;
    }

    public void setLastWorkTime(long lastWorkTime) {
        this.lastWorkTime = lastWorkTime;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result
                + ((connection == null) ? 0 : connection.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        WarpConnection other = (WarpConnection) obj;
        if (connection == null) {
            if (other.connection != null)
                return false;
        } else if (!connection.equals(other.connection))
            return false;
        return true;
    }

    /**
     * 查看链接是否有效
     * 
     * @param connectionLonger
     *            连接最大时间
     * @return
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 5:21:07 PM
     * @version 1600000
     * @throws SQLException
     */
    public boolean veryfiConnection(int connectionLonger) {
        try {

            if (this.connection == null || this.connection.isClosed()
                    || isTimeOut(connectionLonger)) {
                return true;
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return false;
    }
}

3.连接池:

/**
 * 
 */
package cn.mjorcen.db.pool;

import java.sql.Connection;
import java.sql.SQLException;

/**
 * 
 * 数据源最高级别接口,定义了数据源的基本功能
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Oct 5, 2014 3:20:21 PM
 * @version 1
 */
public interface PooledDataSource {
    /**
     * 获取链接
     * 
     * @return
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:23:03 PM
     * @version 1
     * @throws SQLException
     */
    Connection getConnection() throws Exception;

    /**
     * 销毁
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:26:00 PM
     * @version 1
     */
    void destroy() throws Exception;

    /**
     * 释放
     * 
     * @param connection
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:27:09 PM
     * @version 1
     */
    void release(Connection connection) throws Exception;

    /**
     * 数据源释放可用
     * 
     * @return
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:28:15 PM
     * @version 1
     */
    boolean isAvailable();

}

一个简单的实现类如下:

/**
 * 
 */
package cn.mjorcen.db.pool.impl;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import cn.mjorcen.db.bean.Configuration;
import cn.mjorcen.db.bean.WarpConnection;
import cn.mjorcen.db.pool.PooledDataSource;

/**
 * 简单的线程池实现
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Oct 5, 2014 3:24:32 PM
 * @version 1
 */
public class AbstractPooledDataSource implements PooledDataSource {

    protected ConcurrentLinkedQueue<WarpConnection> idleQueue;
    protected ConcurrentLinkedQueue<WarpConnection> busyQueue;
    protected ThreadLocal<Connection> threadLocal;
    protected AtomicInteger totalSize;
    protected AtomicInteger currentSize;
    protected boolean available;
    protected Configuration configuration;
    final Lock lock = new ReentrantLock();//// final Condition notFull = lock.newCondition(); // 实例化两个condition
    final Condition notEmpty = lock.newCondition();

    public AbstractPooledDataSource(Configuration configuration)
            throws Exception {
        super();
        this.configuration = configuration;
        idleQueue = new ConcurrentLinkedQueue<WarpConnection>();
        busyQueue = new ConcurrentLinkedQueue<WarpConnection>();
        threadLocal = new ThreadLocal<Connection>();
        totalSize = new AtomicInteger(0);
        currentSize = new AtomicInteger(0);
        init();
    }

    /**
     * 
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 3:49:36 PM
     * @version 1
     * @throws ClassNotFoundException
     */
    private void init() throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        for (int i = 0; i < this.configuration.getInitialPoolSize(); i++) {
            idleQueue.add(WarpConnection.warp(openConnection()));
        }
        this.totalSize.set(this.configuration.getInitialPoolSize());
        available = true;
    }

    protected Connection openConnection() throws SQLException {
        return DriverManager.getConnection(configuration.getUrl(),
                configuration.getUser(), configuration.getPassword());
    }

    public Connection getConnection() throws SQLException {
        Connection connection = threadLocal.get();
        if (connection != null) {
            return connection;
        }
        try {
            lock.lock();
            WarpConnection warpConnection = null;
            try {
                warpConnection = this.idleQueue.remove();
            } catch (NoSuchElementException e) {
                warpConnection = getWarpConnection();
            }
            veryfiConnection(warpConnection);
            warpConnection.setLastWorkTime(System.currentTimeMillis());
            this.busyQueue.add(warpConnection);
            threadLocal.set(warpConnection.getConnection());
            return warpConnection.getConnection();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 检查链接状态
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 5:17:06 PM
     * @version 1
     * @param warpConnection
     * @throws SQLException
     */
    private void veryfiConnection(WarpConnection warpConnection)
            throws SQLException {
        if (warpConnection.veryfiConnection(this.configuration
                .getConnectionLonger())) {
            warpConnection.setConnection(openConnection());
            warpConnection.setConnectionTime(System.currentTimeMillis());
        }
    }

    /**
     * 
     * @return
     * 
     * @author mjorcen
     * @email mjorcen@gmail.com
     * @dateTime Oct 5, 2014 4:44:52 PM
     * @version 1
     * @throws SQLException
     */
    private WarpConnection getWarpConnection() throws SQLException {
        WarpConnection warpConnection = null;

        if (this.totalSize.get() < configuration.getMaxPoolSize()) {
            warpConnection = WarpConnection.warp(openConnection());
            this.totalSize.addAndGet(1);
            return warpConnection;
        }
        while (true) {
            try {
                warpConnection = this.idleQueue.remove();
                return warpConnection;
            } catch (NoSuchElementException e) {
                try {
                    this.notEmpty.wait();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }

    }

    public void destroy() {
        this.available = false;
        ConcurrentLinkedQueue<WarpConnection> _idleQueue = this.idleQueue;
        ConcurrentLinkedQueue<WarpConnection> _busyQueue = this.busyQueue;
        this.idleQueue = null;
        this.busyQueue = null;
        this.threadLocal = null;
        for (WarpConnection connection : _idleQueue) {
            closeQuiet(connection.getConnection());
        }
    }

    private void closeQuiet(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public void release(Connection connection) throws Exception {
        try {
            lock.lock();
            if (this.available) {
                WarpConnection warpConnection = null;
                for (WarpConnection element : this.busyQueue) {
                    if (element.getConnection().equals(connection)) {
                        warpConnection = element;
                        break;
                    }
                }
                this.busyQueue.remove(warpConnection);
                this.idleQueue.add(warpConnection);
                // System.out.println("busyQueue = " + busyQueue.size());
                // System.out.println("idleQueue = " + idleQueue.size());
                threadLocal.set(null);
                notEmpty.signal();// 一旦插入就唤醒取数据线程
            } else {
                closeQuiet(connection);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public boolean isAvailable() {
        return available;
    }

}

调用类:

/**
 * 
 */
package cn.mjorcen.db.test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import cn.mjorcen.db.bean.Configuration;
import cn.mjorcen.db.pool.impl.AbstractPooledDataSource;

/**
 * 
 * 
 * @author mjorcen
 * @email mjorcen@gmail.com
 * @dateTime Oct 5, 2014 4:00:09 PM
 * @version 1
 */
public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration("product_db");
        final AbstractPooledDataSource dataSource = new AbstractPooledDataSource(
                conf);
        ExecutorService executor = Executors.newFixedThreadPool(10);

        Runnable r = new Runnable() {
            public void run() {
                try {
                    for (int i = 0; i < 3; i++) {
                        Connection connection = dataSource.getConnection();
                        System.out.println(Thread.currentThread().getName()
                                + " : " + connection);
                        Thread.sleep(3000);
                        dataSource.release(connection);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10; i++) {

            executor.execute(r);
        }
        // Connection connection = dataSource.getConnection();
        // connection = dataSource.getConnection();
        // System.out.println(connection);
        // dataSource.release(connection);
    }
}

配置文件:

driverClassName=com.mysql.jdbc.Driver
validationQuery=SELECT 1
jdbc_url=jdbc:mysql://115.29.36.149:3306/sai_zd?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
jdbc_username=c
jdbc_password=c
initialPoolSize=3
minPoolSize=3
maxPoolSize=10
maxStatements=30
maxIdleTime=25000
idleConnectionTestPeriod=18000
connectionLonger=3
原文地址:https://www.cnblogs.com/mjorcen/p/4007341.html