mybatis——datasource

本来准备看下DruidDatasource的,发现太复杂了,还是研究一下mybatis的datasource吧,目的:

  • 弄清datasource的基本原理
  • 简单的调优

mybatis的datasource实现:无连接池实现的UnpooledDataSource,连接池实现的PooledDataSource

 一、DataSource

DataSource是JDK中提供数据库库连接的顶级接口。DataSource类图

 主要实现:

getConnection():获取一个数据库连接。

getLogWriter():日志

setLoginTimeout:数据源在尝试连接数据库时的最大等待时间

wrapper:包装器,例如BeanWrapper包装Bean,允许实例包装datasource。

二、UnpooledDataSource

 UnpooledDataSource是一个无连接池实现的DataSource,JDBC的简单封装,开启一个连接,执行完关闭连接。

1、变量与构造方法

  private ClassLoader driverClassLoader;//Driver的类加载器
  private Properties driverProperties;//启动属性
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();//驱动程序容器

  private String driver;//数据库连接驱动程序
  private String url;//数据库连接url
  private String username;//用户名
  private String password;//密码

  private Boolean autoCommit;//自动提交
  private Integer defaultTransactionIsolationLevel;//默认事务隔离级别

  static {
    //类加载时,扫描DriverMananger中所有的数据库连接驱动程序
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

  public UnpooledDataSource() {
  }

  public UnpooledDataSource(String driver, String url, String username, String password) {
    this.driver = driver;
    this.url = url;
    this.username = username;
    this.password = password;
  }

  public UnpooledDataSource(String driver, String url, Properties driverProperties) {
    this.driver = driver;
    this.url = url;
    this.driverProperties = driverProperties;
  }

  public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
    this.driverClassLoader = driverClassLoader;
    this.driver = driver;
    this.url = url;
    this.username = username;
    this.password = password;
  }

  public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
    this.driverClassLoader = driverClassLoader;
    this.driver = driver;
    this.url = url;
    this.driverProperties = driverProperties;
  }

2、主要实现方法

getConnection():获取数据库连接,JDBC的封装使用。

/* org.apache.ibatis.datasource.unpooled.UnpooledDataSource*/
  public Connection getConnection() throws SQLException {
    return doGetConnection(username, password);
  }

  public Connection getConnection(String username, String password) throws SQLException {
    return doGetConnection(username, password);
  }

  private Connection doGetConnection(String username, String password) throws SQLException {
    Properties props = new Properties();
    if (driverProperties != null) {
      //设置driverProperties文件到属性对象中
      props.putAll(driverProperties);
    }
    if (username != null) {
      //设置username到属性对象中,注意显式配置username会覆盖driverPropertie文件中的
      props.setProperty("user", username);
    }
    if (password != null) {
      //设置password到属性对象中中,注意显式配置password会覆盖driverPropertie文件中的
      props.setProperty("password", password);
    }
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    //启动驱动程序
    //若在registerDrivers中(已启动),直接返回
    //若不在registerDrivers中,使用driverClassLoader(不为空)加载driver类,启动驱动程序
   //若driverClassLoader为空,所有的类加载器,遍历加载一次,直到能加载为止
    initializeDriver();
    //JDBC获取连接的语句
    Connection connection = DriverManager.getConnection(url, properties);
    //设置连接属性
    //自动提交autocommit
    //默认事务隔离级别:defaultTransactionIsolationLevel
    configureConnection(connection);
    return connection;
  }

3、总结

UnpooledDataSource就是JDBC的简单封装,没有其他逻辑。

① 类似工厂方法提供一个连接方法,getConnection().

② 启动数据库连接驱动程序driver,类加载器不是我以为的线程上下文类加载器,而是遍历所有的类加载器,直到找到合适的类加载器加载成功(cathc异常)。

三、PooledDataSource

PooledDataSource是数据库连接的连接池实现。前面研究过线程池ThreadPoolExecutor实现,多个线程+队列。这里猜测应该也是用的实现差不多,使用队列实现,不同在于这里队列里的元素时connection。

1、变量与构造方法

  private final PoolState state = new PoolState(this);//连接池中连接存放的容器(两个队列)

  private final UnpooledDataSource dataSource;//装饰者模式,PooledDataSource是一个增强的UnpooledDataSource

  // OPTIONAL CONFIGURATION FIELDS
  protected int poolMaximumActiveConnections = 10;//连接池中最大活跃连接数
  protected int poolMaximumIdleConnections = 5;//连接池中最大空闲连接数
  protected int poolMaximumCheckoutTime = 20000;//连接池中活跃连接的超时时间
  protected int poolTimeToWait = 20000;//连接池满时,获取连接的等待时间
  protected int poolMaximumLocalBadConnectionTolerance = 3;//连接池中坏连接容忍数
  protected String poolPingQuery = "NO PING QUERY SET";//获取连接后,ping数据库连接检测时执行的sql,(开启检测后需要设置,否则检测无效)
  protected boolean poolPingEnabled;//启用ping数据库连接检测的标识(实际使用经常是关闭的),获取连接后,ping数据库校验连接有效
  protected int poolPingConnectionsNotUsedFor;//一个时间参数,获取连接后,连接空闲时间大于此值,才需要进行ping数据库连接检测

  private int expectedConnectionTypeCode;//连接池属性集合的hashCode,("" + url + username + password).hashCode();

  public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

  public PooledDataSource(UnpooledDataSource dataSource) {
    this.dataSource = dataSource;
  }

  public PooledDataSource(String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

2、主要实现

① 连接池容器PoolState

PoolState是实际的连接池容器,里面还记录了连接池的一些属性,主要看看属性变量,部分属性是连接池调优的参考参数。

  protected PooledDataSource dataSource;//记录实际的数据源
  protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();//空闲连接队列
  protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();//活动连接队列
  protected long requestCount = 0;//线程池接受连接请求总数
  protected long accumulatedRequestTime = 0;//线程池接受连接请求的总响应时间
  protected long accumulatedCheckoutTime = 0;//线程池中连接的总连接时间(包括已关闭的连接)
  protected long claimedOverdueConnectionCount = 0;//线程池中逾期连接数
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;//逾期连接的总连接时间
  protected long accumulatedWaitTime = 0;//线程池中获取连接总等待时间(空闲队列为空,活动队列满)
  protected long hadToWaitCount = 0;//线程池中获取连接总等待次数
  protected long badConnectionCount = 0;//线程池中的坏连接数

② 连接对象的代理PooledConnection

PooledConnection实现了InvokeHandler,JDK动态代理的实现

主要看看PooledConnection属性与invoke方法,属性

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
  private final int hashCode; //连接的hashCode
  private final PooledDataSource dataSource; //连接的数据源
  private final Connection realConnection;//真实连接,target object
  private final Connection proxyConnection;//代理连接 proxy object
  private long checkoutTimestamp; // 连接连接时长
  private long createdTimestamp; // 连接的创建时间
  private long lastUsedTimestamp; //  连接的最后使用时间
  private int connectionTypeCode; // 连接数据源属性的hashCode (url+username+password).hashCode
  private boolean valid; //连接有效标识

  public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    //创建代理对象
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

invoke():连接复用的实现之一pushConnection(),执行的时机。

  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      //调用connection.close()时,不是直接调用真实连接target object的close
      //而是通过proxy object调用PooledDataSource.pushConnection(target object)
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        if (!Object.class.equals(method.getDeclaringClass())) {
          //非Object方法的其他方法,先检查valid(有效标识),无效抛出异常
          checkConnection();
        }
        //调用target object的方法
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

③ 连接池连接复用实现 :队列实现

出队:popConnection():返回一个连接

  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }

  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
   //重复获取连接直到,获取成功或者获取超时
    while (conn == null) {
      //加锁
      synchronized (state) {
        if (!state.idleConnections.isEmpty()) {
          //空闲连接队列不为空,出队一个连接(连接复用)
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 空闲连接队列为空
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // 活动连接队列 < 最大活动连接数,新建一个连接
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            // 活动连接队列大小已到达最大活动连接数,取活动时间最长的活动连接(FIFO取队列头poll())
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 活动连接队列中时长最长的连接 超出了 最大连接时间
              // 更新连接池总逾期连接数+1
              // 更新连接池逾期连接的总连接时间
              // 更新连接池连接的总连接时间
              // 将逾期连接作废:移出活动连接队列+非自动提交连接回退+更改作废标识+创建新的连接代理
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 移出活动连接队列
              state.activeConnections.remove(oldestActiveConnection);
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                //非自动提交连接回退(事务时)
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  log.debug("Bad connection. Could not roll back");
                }  
              }
              //创建新的连接代理,旧的连接代理作废(invalid=false),注意:保留旧的pooledConnection(可能还在执行),等realConnection.close()一起被GC。
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // 空闲连接队列为空,活动连接队列为满,并且活动连接中连接时长最长的连接未超时
              // 必须等待
              try {
                if (!countedWait) {
                  //统计等待次数,调优参考参数,过大可能需要设置更大的活动连接数了
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                //生产者--消费者模型,等待唤醒,让出锁+CPU
                state.wait(poolTimeToWait);
                //统计连接池的等待时长
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // 获取连接成功,ping数据库服务器,检查连接是否还有效,例如连接闲置时间>数据库超时时间,连接失效  
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            //连接有效,
            //更新连接属性hashCode,
            //更新获取连接的时间,最后使用连接的时间,并
            //加入到活动连接队列中,
            //连接池请求数+1,请求响应时间统计
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            //连接无效
            //更新连接池,坏连接数+1,本地坏连接数+1,
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              //当坏连接数 > 最大空闲连接数 + 本地坏连接容忍数时,抛出异常,
              //此种场景相当于连接池终止服务
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      //最后检测conn是否获取成功
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

入队:pushConnection():关闭连接或入空闲连接队列

  protected void pushConnection(PooledConnection conn) throws SQLException {
    //加锁
    synchronized (state) {
      //从活动连接队列中删除
      state.activeConnections.remove(conn);
      if (conn.isValid()) {
        //连接有效
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          //当前空闲连接数小于最大空闲连接数,并且连接属于同一个数据库
          //更新连接池总连接时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            //连接不是自动提交,回退(加锁后另外保证策略,清除连接的职责)
            conn.getRealConnection().rollback();
          }
          //创建连接的新代理,旧代理作废
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          //放入到空闲连接队列中,更新连接创建时间,最后使用时间
          state.idleConnections.add(newConn);
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          //设置旧连接无效
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          //生产者--消费者模型,唤醒popConnection中所有等待线程
          state.notifyAll();
        } else {
          //空闲连接数已达到最大,关闭连接
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          conn.invalidate();
        }
      } else {
        //连接无效,更新连接池坏连接数+1
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        state.badConnectionCount++;
      }
    }
  }

3、总结

① UnpooledDataSource是对JDBC的封装使用,PooledDataSource利用装饰者模式增强UnpooledDataSource的功能,实现了线程池。

② 采用队列实现:空闲队列+活动队列,队列最大容量分别为poolMaxinumIdleConnections(默认5),poolMaxinumActiveConnections(默认10)

③ PoolState封装了线程池的运行时数据,这些数据利于监控,调优。例如:等待连接次数过多,很可能是设置的最大活动连接数、最大空闲连接数过小。

④ getConnection实际返回的是连接对象的代理对象,利用JDK动态代理技术,connection.close()实际是调用proxyConneciton.close(),实现连接的回收(入空闲队列),便于复用。

⑤ 活动连接超出连接时间时,请求连接会抢占真实连接,作废旧的代理连接,创建一个新的代理连接。需要注意的是:旧的代理连接按道理会等连接关闭时,一起被GC。

⑥ 利用了生产者-消费者模型实现

⑦ 简单的流程图

 

原文地址:https://www.cnblogs.com/wqff-biubiu/p/12466771.html