Java 之 JDBC线程池(源码版)

一、目录

二、代码

PoolConstant

package cn.kgc.kb08.jdbc.dao3.impl;

public interface PoolConstant {
    String POOL_CORE_COUNT="coreCount";
    String POOL_MAX_COUNT="maxCount";
    String POOL_MAX_IDELE="maxIdel";
    String POOL_MAX_WAIT="maxWait";
    String POOL_RETRY_INTERVAL="retryInterval";
    String POOL_MAX_RETRY_COUNT="maxRetryCount";
    String POOL_EXIT_ON_ERR="exitOnErr";

    String[] POOL={
            POOL_CORE_COUNT,
            POOL_MAX_COUNT,
            POOL_MAX_IDELE,
            POOL_MAX_WAIT,
            POOL_RETRY_INTERVAL,
            POOL_MAX_RETRY_COUNT,
            POOL_EXIT_ON_ERR
    };




    String MYSQL_DRI="driver";
    String MYSQL_URI="url";
    String MYSQL_USER="username";
    String MYSQL_PASS="password";
    String[] MYSQL = {
            MYSQL_DRI,
            MYSQL_URI,
            MYSQL_USER,
            MYSQL_PASS
    };




}

PoolUtil

package cn.kgc.kb08.jdbc.dao3.impl;

import cn.kgc.kb08.jdbc.dao2.Dao;
import cn.kgc.kb08.jdbc.dao2.impl.BaseDao;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class PoolUtil {


    private static Dao dao;

    /**
     * 解析数据源配置信息
     * @param dataSource 数据源名称
     * @return Map<String,String>
     */
    protected static <T>Map<String,T> parse(Class<T> c,String dataSource, List<String> items){
//        File config = new File("config/sys.properties");
//        Properties pro = new Properties();
//        try {
//            pro.load(new FileInputStream(config));
//            Map<String,T> map = new HashMap<>(items.size());
//            for (String item : items) {
//                String key = dataSource+"."+ item;
//                if (!pro.containsKey(key)){
//                    throw new IOException("缺少配置项目"+item);
//                }
//                map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
//            }
//        } catch (Exception e) {
//            e.printStackTrace();
//            System.out.println("资源配置缺失,系统强制退出"+e.getMessage());
//            System.exit(-1);
//        }finally {
//            if(null!=pro){
//                pro.clear();
//                pro = null;
//
//            }
//        }
//
//        return null;
        File config = new File("config/sys.properties");
        Properties pro = new Properties();//Properties是一个文件
        try {
            pro.load(new FileInputStream(config));
            //final String[] items = {"driver", "url", "username", "password"};
            Map<String,T> map = new HashMap<>(items.size());
            for (String item : items) {
                String key = dataSource + "." + item;
                if (!pro.containsKey(key)) {
                    throw new IOException("缺少配置项:" + item);//不包含,就是缺项了
                }
                map.put(item,c.getConstructor(String.class).newInstance(pro.getProperty(key)));
            }
            return map;
        } catch (Exception e) {
            System.err.println(dataSource+"数据源配置信息异常,系统强制退出:" + e.getMessage());
            System.exit(-1);
        } finally {
            if (null != pro) {
                pro.clear();
                pro = null;
            }
        }
        return null;

    }


    protected  static void close(AutoCloseable...acs){
        for (AutoCloseable ac : acs) {
            if (null != ac) {
                try {
                    ac.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }}

}

***重点类**** ConPool

package cn.kgc.kb08.jdbc.dao3.impl;


import cn.kgc.kb08.jdbc.dao3.SelRtn;
import cn.kgc.kb08.jdbc.dao3.Dao;
import cn.kgc.kb08.jdbc.dao3.Pool;

import java.lang.reflect.Method;
import java.sql.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 弹性连接池:生产和管理对象的
 */
public final  class ConPool implements Pool {

//    pool.maxIdel=30是什么 => 请看文档,官方会写
//    pool.retryInterval=50
//    pool.maxRetryCount=8

    /**
     * 池中连接
     */
    class PoolCon {
        boolean free = true;
        boolean core;
        Connection con;
        long idleBegin;

        public PoolCon(boolean core, Connection con) {
            this.core = core;
            this.con = con;
            restIdle();
        }

        public void restIdle() {
            if (!core) {
                this.idleBegin = System.currentTimeMillis();
            }
        }


    }

    private ConcurrentMap<Integer, PoolCon> pool;
    private Map<String, Integer> cnfPool;
    private Map<String, String> cnfCon;
    /**
     * 执行定期清理线程池
     * 检查核心连接对象的有效性,无效则创建新核心连接对象覆盖
     * 检查临时连接对象是否超时,超时则关闭并移除
     */

    private ScheduledExecutorService schedule;
    private ExecutorService service;
    private Lock lock;
    private Condition cond;
    private boolean clearing;

    public ConPool() {
        initCnf();
        initPool();
        startClear();
    }

// 塞进pool和mysql的配置:比如Map中driver:xxx的键值对
    private void initCnf() {
        cnfPool = PoolUtil.parse(Integer.class, "pool",
                Arrays.asList(PoolConstant.POOL));
        cnfCon = PoolUtil.parse(String.class, "mysql01",
                Arrays.asList(PoolConstant.MYSQL));
    }


    //    初始化连接池
    private void initPool() {
        final int MAX_COUNT = cnfPool.get(PoolConstant.POOL_MAX_COUNT);
        service = Executors.newFixedThreadPool(MAX_COUNT * 2);
        schedule = Executors.newSingleThreadScheduledExecutor();
        lock = new ReentrantLock(true);
        cond = lock.newCondition();
        //分段锁的集合
        pool = new ConcurrentHashMap<>(MAX_COUNT);
        // 池中连接
        PoolCon pc;
        final int CORE_COUNT = cnfPool.get(PoolConstant.POOL_CORE_COUNT);
        for (Integer i = 0, j = 1; i <= CORE_COUNT; i++) {
            pc = makePoolCon(true);
            if (null != pc) {
                // 给核心连接一个编号
                pool.put(j++, pc);
            }
        }
        if (pool.size() == 0) {
            System.err.println("连接池初始化失败,系统强制退出");
            System.exit(-1);
        }
        // 如果配置让你失败便退出,且核心池数量小于一半
        if (cnfPool.get(PoolConstant.POOL_EXIT_ON_ERR) == 1
                && pool.size() <= CORE_COUNT / 2) {
            System.err.println("连接池初始化过半异常,系统强制退出");
            System.exit(-1);

        }
    }


    /**
     * 创建一个池中的连接对象
     *
     * @param core 池对象类型,true:核心对象,false:临时对象
     * @return
     */

    private PoolCon makePoolCon(boolean core) {
        PoolCon pc = null;
        // 最大重试次数,创建n次,创建出一个连接对象
        for (int i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
            try {
                Connection con = DriverManager.getConnection(
                        cnfCon.get(PoolConstant.MYSQL_URI),
                        cnfCon.get(PoolConstant.MYSQL_USER),
                        cnfCon.get(PoolConstant.MYSQL_PASS)
                );
                pc = new PoolCon(core, con);
            } catch (SQLException e) {
                try {
                    // 创建失败就休息片刻再创建(重试)
                    TimeUnit.SECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                    continue;
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                    System.out.println("cuocuocuo");
                }
                e.printStackTrace();
            }
        }
        return pc;

    }

    /**
     * 验证核心连接对象是否有效
     *
     * @param pc
     * @return
     */
    private boolean isPCValid(PoolCon pc) {
        try {
            pc.con.createStatement().executeQuery("select 1");
            return true;
        } catch (SQLException e) {
            return false;
        }
    }

    /**
     * 验证临时连接对象是否过期
     *
     * @param pc 池连接对象
     * @return true:过期,false:没过期
     */
    private boolean isExpired(PoolCon pc) {
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - pc.idleBegin) >=
                cnfPool.get(PoolConstant.POOL_MAX_IDELE);
    }


    /**
     * 验证用户是否超出配置最大时限
     * @param waitBegin   计算参考起点时间
     * @return
     */
    private boolean isWaitExpired(long waitBegin){
        return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()-waitBegin)>=
                cnfPool.get(PoolConstant.POOL_MAX_WAIT);
    }

    /**
     * 开启定期清理任务
     * maxIdle,最长闲置时间
     */
    private void startClear() {
        int delay = cnfPool.get(PoolConstant.POOL_MAX_IDELE);
        schedule.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                clearing = true;
                for (Integer key : pool.keySet()) {
                    PoolCon pc = pool.get(key);
                    if (!pc.free) {
                        continue;
                    }
                    if (pc.core) {
                        if (!isPCValid(pc)) {
                            pool.put(key, makePoolCon(true));
                        }
                    } else {
                        if (isExpired(pc) || !isPCValid(pc)) {
                            pool.remove(key);
                        }
                    }
                }


                clearing = false;
                cond.signalAll();
                lock.unlock();
            }
        }, delay, delay, TimeUnit.SECONDS);
    }


    /**
     * 连接池销毁
     */
    @Override
    public void destory() {
        while (pool.size() > 0) {
            for (Integer key : pool.keySet()) {
                PoolCon pc = pool.get(key);
                if (pc.free) {
                    pc.free = false;
                    PoolUtil.close(pc.con);
                    pool.remove(key);
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }


    }


    /**
     *
     * @return
     */
    private PoolCon fetch() {
        long waitBegin = System.currentTimeMillis();

            for (Integer i = 0; i <= cnfPool.get(PoolConstant.POOL_MAX_RETRY_COUNT); i++) {
                try {
                    lock.lock();
                    if (clearing) {
                        cond.await();
                    }
                    for (Integer key : pool.keySet()) {
                        PoolCon pc = pool.get(key);
                        if (pc.free && isPCValid(pc)) {
                            pc.free = false;
                            return pc;
                        }
                    }
                    if(isWaitExpired(waitBegin)){
                        return null;
                    }
                    TimeUnit.MILLISECONDS.sleep(cnfPool.get(PoolConstant.POOL_RETRY_INTERVAL));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
            if(pool.size()< cnfPool.get(PoolConstant.POOL_MAX_COUNT)){
                   PoolCon pc = makePoolCon(false);
                   if (null != pc){
                       pc.free = false;
                      pool.put(pool.size()+1,pc);
                      return pc;
                   }
            }
        return null;
    }




    private  void  giveback(PoolCon pc){
        if(null==pc){
            return;
        }
        if(!pc.core){
            pc.restIdle();
        }
        pc.free = true;
    }



@Override
    public Dao newDao(){
        return new Dao() {
            private PreparedStatement getPst(Connection con, final String SQL, Object... params) throws SQLException {
                PreparedStatement pst = con.prepareStatement(SQL);
                if (null != params && params.length > 0) {
                    for (int i = 0; i < params.length; i++) {
                        pst.setObject(i + 1, params[i]);
                    }
                }
                return pst;
            }

            private int update(PreparedStatement pst) throws SQLException {
                return pst.executeUpdate();
            }

            private ResultSet query(PreparedStatement pst) throws SQLException {
                return pst.executeQuery();
            }

            private Map<String, Method> parseMethod(Class c) {
                Map<String, Method> mapMethod = new HashMap<>();
                final String PREFIX = "set";
                for (Method method : c.getDeclaredMethods()) {
                    String name = method.getName();
                    if (!name.startsWith(PREFIX)) {
                        continue;
                    }
                    name = name.substring(3);
                    name = name.substring(0, 1).toLowerCase() + name.substring(1);
                    mapMethod.put(name, method);
                }
                return mapMethod;
            }

            private String[] parseStruct(ResultSetMetaData md) throws SQLException {
                String[] names = new String[md.getColumnCount()];
                for (int i = 0; i < names.length; i++) {
                    names[i] = md.getColumnLabel(i + 1);
                }
                return names;
            }

            @Override
            public int exeUpd(final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<Integer>() {
                        @Override
                        public Integer call() throws Exception {
                            int rst = 0;
                            PoolCon pc = null;
                            //                        Connection con = null;
                            PreparedStatement pst = null;
                            try {
                                pc = fetch();
                                if (null != pc) {
                                    pst = getPst(pc.con, SQL, params);
                                    rst = update(pst);
                                }

                            } catch (SQLException e) {
                                rst = -1;
                            } finally {
                                PoolUtil.close(pst);
                                giveback(pc);
                            }
                            return rst;
                        }
                    }).get();
                } catch (Exception e) {
                    return -1;
                }

            }

            @Override
            public <T> SelRtn exeSingle(final Class<T> c, final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<SelRtn>() {
                        @Override
                        public SelRtn call() throws Exception {
                            PoolCon pc = null;
                            PreparedStatement pst = null;
                            ResultSet rst = null;
                            try {
                                pc = fetch();
                                pst = getPst(pc.con, SQL, params);
                                rst = query(pst);
                                if (null != rst && rst.next()) {
                                    //                调用类型(非Character基本类型包装类)c的,带有唯一字符串参数的构造方法
                                    //                c.getConstructor(String.class)//基本类型创建对象
                                    return SelRtn.succeed(
                                            c.getConstructor(String.class).newInstance(rst.getObject(1).toString()));
                                } else {
                                    return SelRtn.succeed(null);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();

                            } finally {
                                //                           close(rst, pst, con);
                                PoolUtil.close(rst, pst);
                                giveback(pc);
                            }
                            return SelRtn.fail();
                        }
                    }).get();
                } catch (Exception e) {
                    return SelRtn.fail();
                }
            }

            @Override
            public <T> SelRtn exeQuery(final Class<T> c, final String SQL, final Object... params) {
                try {
                    return service.submit(new Callable<SelRtn>() {
                        @Override
                        public SelRtn call() throws Exception {
                            PoolCon pc = null;
                            PreparedStatement pst = null;
                            ResultSet rst = null;
                            try {
                                pst = getPst(pc.con, SQL, params);
                                rst = query(pst);
                                if (null != rst && rst.next()) {
                                    List<T> list = new ArrayList<>();
                                    Map<String, Method> map = parseMethod(c);
                                    String[] names = parseStruct(rst.getMetaData());
                                    do {
                                        T t = c.newInstance();
                                        for (String name : names) {
                                            map.get(name).invoke(t, rst.getObject(name));
                                        }
                                        list.add(t);
                                    } while (rst.next());
                                    return SelRtn.succeed(list);
                                } else {
                                    return SelRtn.succeed(null);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                PoolUtil.close(rst, pst);
                                giveback(pc);
                            }
                            return SelRtn.fail();
                        }
                    }).get();
                } catch (Exception e) {
                    return SelRtn.fail();
                }
            }
        };


    }
}

Pool

public interface Pool {
    void destory();
    Dao newDao();
}

PoolFactory

package cn.kgc.kb08.jdbc.dao3;

import cn.kgc.kb08.jdbc.dao3.impl.ConPool;

public abstract class PoolFactory {
    private static Dao dao;
    private static synchronized  void init(){
        if(null==dao){
            dao = new ConPool().newDao();
        }
    }

    public static Dao get(){
        if(null==dao){
            init();
        }
        return dao;
    }
}

SelRtn

package cn.kgc.kb08.jdbc.dao3;

/**
 * 完善查询操作返回类型,对于异常的缺失
 */
public final  class SelRtn {
    private boolean err = false;
    private Object rtn;

    public static SelRtn succeed(Object rtn){
        return new SelRtn(rtn);
    }
    public static SelRtn fail(){
        return new SelRtn();
    }



    private SelRtn(Object rtn) {
        this.rtn = rtn;
    }

    private SelRtn() {
        this.err = true;
    }

    public boolean isErr(){
        return this.err;
    }

    public <T> T getRtn(){
        return (T) rtn;
    }

}
原文地址:https://www.cnblogs.com/sabertobih/p/14011825.html