Hikari定制连接池容器ConcurrentBag

它是为了实现比 LinkedBlockingQueue 和 LinkedTransferQueue 更高的性能而特别定制的,根据连接池的特殊场景做了一些性能优化

容器类定义

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable {
   // 底层存储 CopyOnWriteArrayList,可以无锁安全的读取连接池的信息
   private final CopyOnWriteArrayList<T> sharedList;
   private final boolean weakThreadLocals; //默认是false 
	 // 线程隔离的 FastList<PoolEntry>
   private final ThreadLocal<List<Object>> threadList;
	 // 其实就是HikariPool,用来新建PoolEntry
   private final IBagStateListener listener;
	 // 等待获取db连接的线程个数
   private final AtomicInteger waiters;
   private volatile boolean closed;

   private final SynchronousQueue<T> handoffQueue;
  //..
}

容器存储对象(条目)的定义

// 容器存放的的 IConcurrentBagEntry 接口实现类 PoolEntry 
public interface IConcurrentBagEntry
   {  
      // 空闲状态
      int STATE_NOT_IN_USE = 0;
      // 连接在使用
      int STATE_IN_USE = 1;
      // remove的时候先标记为被移除
      int STATE_REMOVED = -1;
      // 被预留状态,不可用但是可以移除 
      // 主要在检查线程中,要对连接进行softEvictConnection,确保能从not-in-use转到 reserved
      int STATE_RESERVED = -2;

      boolean compareAndSet(int expectState, int newState);
      void setState(int newState);
      int getState();
   }

它的几种状态扭转

image.png

连接池容器初始化

public ConcurrentBag(final IBagStateListener listener)
{
	 // 其实就是HikariPool, 负责创建PoolEntry
   this.listener = listener;
	 // false
   this.weakThreadLocals = useWeakThreadLocals();
   this.handoffQueue = new SynchronousQueue<>(true);
   // 等待获取db连接的线程个数
   this.waiters = new **AtomicInteger**();
   //底层存储 CopyOnWriteArrayList
   this.sharedList = new **CopyOnWriteArrayList**<>();
   if (weakThreadLocals) {
      this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
   }
   else {
			// ThreadLocal, 使用自定义的FastList
      this.threadList = ThreadLocal.withInitial(() -> new **FastList**<>(IConcurrentBagEntry.class, 16));
   }
}

容器操作 crud

  1. 从容器获取对象 borrow(long timeout, TimeUnit timeunit)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
   // Try the thread-local list first
   // **1.** 先从 threadlocal的FastList中取,倒序遍历, 同一个线程归还连接后再获取最新的连接 可避免一些检查
   // 如果能取到就直接返回 这样无需锁
   final List<Object> list = threadList.get();
   for (int i = **list.size() - 1;** i >= 0; i--) {
      final Object entry = list.remove(i);
      @SuppressWarnings("unchecked")
      final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
      if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
         return bagEntry;
      }
   }

   // **2.** threadlocal获取不到,则需要从底层的sharedList中去寻找
   // 累加等待计数
   // Otherwise, scan the shared list ... then poll the handoff queue
   final int waiting = waiters.incrementAndGet();
   try {
      for (T bagEntry : sharedList) {
         if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // If we may have stolen another waiter's connection, request another bag add.
            if (waiting > 1) {
               listener.addBagItem(waiting - 1);
            }
            return bagEntry;
         }
      }

		  // 如果连接池里边无空闲连接了,则需要HikariPool去增加连接
      listener.addBagItem(waiting);
		  
      // 进入超时等待循环 
      timeout = timeUnit.toNanos(timeout);
      do {
         final long start = currentTime();
         final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
         if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }

         timeout -= elapsedNanos(start);
      } while (timeout > 10_000);

      return null;
   }
   finally {
     // 扣减等待计数
      waiters.decrementAndGet();
   }
  1. 新增对象 add(final T bagEntry)
public void add(final T bagEntry)
{
   if (closed) {
      LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }

   sharedList.add(bagEntry);

   // spin until a thread takes it or none are waiting
   while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !**handoffQueue.offer(bagEntry))** {
      Thread.yield();
   }
}

将对象塞到底层 copyOnWriteArrayList里,如果有线程在等待连接,会把新增的这个 bagEntry塞到 handoffQueue,给阻塞在 borrow() 超时等待的线程

  1. 归还对象 requite(final T bagEntry)
public void requite(final T bagEntry)
{
   bagEntry.setState(STATE_NOT_IN_USE);

   for (int i = 0; waiters.get() > 0; i++) {
      if (bagEntry.getState() != STATE_NOT_IN_USE || **handoffQueue.offer(bagEntry)**) {
         return;
      }
      else if ((i & 0xff) == 0xff) {
         parkNanos(MICROSECONDS.toNanos(10));
      }
      else {
         Thread.yield();
      }
   }

   final List<Object> threadLocalList = threadList.get();
   if (threadLocalList.size() < 50) {
      **threadLocalList**.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
   }
}
  • 状态置为空闲 STATE_NOT_IN_USE
  • 如果此时有线程在等待db连接,则会把这个连接 交给 handoffQueue 直接让阻塞在 borrow 的线程获取到
  • 然后把这个连接存到 threadLocal里,这样当前线程如果再次获取db连接的话就会非常快
  1. 移除连接 remove(final T bagEntry)
public boolean remove(final T bagEntry)
{
   // 1.
   if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
      return false;
   }
   // 2.
   final boolean removed = sharedList.remove(bagEntry);
   if (!removed && !closed) {
      LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
   }
	 // 3.
   threadList.get().remove(bagEntry);

   return removed;
}
  • 只有空闲状态(NOT_IN_USE) 和 保留状态(RESERVED) 的连接才能被移除和关闭
  • 从底层 sharedList 中移除
  • 从 threadLocal 中移除

为什么 hikari 号称最快的连接池?

  • 采用 threadLocal 缓存同一个线程使用过的db连接,并且是从最近一个开始遍历int i = list.size() - 1**;**, 这样同一个java线程多次获取db连接的时候就能快速获取到连接,而且获取到最鲜活的,还可避免 HikariPool#getConnection 的连接可用性的检查
  • ConcurrentBag 底层 sharedList 用的 copyOnWriteArrayList,所以读和写之间就不需要加锁了
  • 监控连接池状态 统计idle、active、total等个数的时候加锁的话, 就会阻塞住 add borrow 等操作
  • ConcurrentBag 申请连接borrow 和归还连接 requite 只是进行 cas 状态变更的无锁操作,并不会从 copyOnWriteArrayList 移除元素
  • 单线程的线程池:新建和关闭连接的线程池都是单线程的,也就避免了线程间的协调开销
  • handoffQueue: SynchronousQueue 当申请连接 borrow() 的时候,如果空闲连接不够用,线程就会阻塞在 handoffQueue#poll ;
    在连接新增 add() 或归还 requite() 时,如果发现有线程在等待中则会把这个连接交给 handoffQueue ,这样等待中的线程直接能直接获取到(hand off很形象啊),而不用再遍历 sharedList 同时也保障了公平性 先到先得
  • minimumIdle 默认等于 maximumPoolSize, 尽量减少在申请连接的时候还需要创建连接的开销

本文来自博客园,作者:mushishi,转载请注明原文链接:https://www.cnblogs.com/mushishi/p/14665235.html

原文地址:https://www.cnblogs.com/mushishi/p/14665235.html