基于Netty包中的Recycler实现的对象池技术详解

一、业务背景

当项目中涉及到频繁的对象的创建和回收的时候,就会出现频繁GC的情况,这时就出现了池化的技术来实现对象的循环使用从而避免对象的频繁回收,Netty包下的Recycler就实现了这一功能。当创建对象的时候直接从池中获取,但使用完毕进行回收的时候,

直接将对象回收到池中,这样可以大量减少对象的创建和回收,是对JVM优化的很好的手段

二、Recycler的使用案例

首先定义需要池化的对象User

 1 public class User
 2 {
 3     /**
 4      * 初始化对象池
 5      * */
 6     private static final Recycler<User> RECYCLER = new Recycler<User>() {
 7         @Override
 8         protected User newObject(Handle<User> handle) {
 9             //如果对象池中没有对象,则调用此方法创建默认的对象实例
10             return new User(handle);
11         }
12     };
13     
14     /**
15      * 获取对象实例,从对象池中获取
16      * */
17     public static User getInstance(){
18         return RECYCLER.get();
19     }
20     
21     /**
22      * handle是对象的封装类,可以看做一个handle就对应一个User
23      * */
24     private final Recycler.Handle<User> handle;
25     
26     /**
27      * 对象的构造方法,采用private,避免通过构造方法直接创建对象实例
28      * */
29     private User(Handle<User> handle){
30         System.out.println(Thread.currentThread().getName()+"创建对象:"+this.toString());
31         this.handle = handle;
32     }
33     
34     /**
35      * 回收对象
36      * */
37     public void recycle(){
38         //将当前对象进行回收到池中
39         System.out.println("回收对象:"+this.toString());
40         this.handle.recycle(this);
41     }
42     
43     /**
44      * 初始化对象的值,给从池中获取的对象进行初始化赋值
45      * */
46     public void init(Long userId, String userName){
47         this.userId = userId;
48         this.userName = userName;
49     }
50     
51     private Long userId;
52     
53     private String userName;
54     
55     public Long getUserId()
56     {
57         return userId;
58     }
59 
60     public void setUserId(Long userId)
61     {
62         this.userId = userId;
63     }
64 
65     public String getUserName()
66     {
67         return userName;
68     }
69 
70     public void setUserName(String userName)
71     {
72         this.userName = userName;
73     }
74 }

测试案例的Main方法如下:

 1 public class RecyclerMain
 2 {
 3     
 4     public static void main(String[] args) throws InterruptedException
 5     {
 6         Thread t1 = new Thread(new Runnable()
 7         {
 8             
 9             @Override
10             public void run()
11             {
12                 //1.从对象池中获取对象,由于对象池中没有空对象,所以新建
13                 User user1 = User.getInstance();
14                 System.out.println(user1.toString());
15                 //2.从对象池中获取对象,由于对象池中没有空对象,所以新建
16                 User user2 = User.getInstance();
17                 System.out.println(user2.toString());
18                 //3.将对象user2回收
19                 user2.recycle();
20                 //从对象池中获取对象,由于对象池中有回收的user2,所以直接返回user2对象
21                 User user3 = User.getInstance();
22                 System.out.println(user3.toString());
23                 //将对象进行回收
24                 user1.recycle();
25             }
26         });
27         t1.start();
28         
29         //睡眠1秒,确保线程t1将user1进行了回收
30         Thread.sleep(1000L);
31     
32         //1.从对象池中获取对象,虽然线程t1的对象池中已经回收了user1对象,但是所属的线程是t1的线程池
33         //而当前线程是Main线程,main线程的对象池中是没有对象的,所以会新建对象
34         User user1 = User.getInstance();
35         System.out.println(user1.toString());
36         //回收user1对象
37         user1.recycle();
38         //获取的对象是刚刚被回收的user1对象
39         User user2 = User.getInstance();
40         System.out.println(user2.toString());
41     }
42 }

执行结果如下:

 1 Thread-0创建对象:com.mrhu.opin.recycle.User@59f45845
 2 com.mrhu.opin.recycle.User@59f45845
 3 Thread-0创建对象:com.mrhu.opin.recycle.User@8d37841
 4 com.mrhu.opin.recycle.User@8d37841
 5 回收对象:com.mrhu.opin.recycle.User@8d37841
 6 com.mrhu.opin.recycle.User@8d37841
 7 回收对象:com.mrhu.opin.recycle.User@59f45845
 8 main创建对象:com.mrhu.opin.recycle.User@6d06d69c
 9 com.mrhu.opin.recycle.User@6d06d69c
10 回收对象:com.mrhu.opin.recycle.User@6d06d69c
11 com.mrhu.opin.recycle.User@6d06d69c

有一点需要注意:对象池的生命周期是和线程绑定的,也就是说不同的线程会有不同的线程池,所以如果线程1将对象回收了,线程2从对象池中获取对象是获取不到线程1回收的对象的。

但是存在一种情况,创建对象的时候是通过线程1创建的,但是回收的时候是通过线程2回收的,这样的场景会出现什么的结果呢?案例如下:

 1 public class RecyclerMain
 2 {
 3     static User user = null;
 4     
 5     public static void main(String[] args) throws InterruptedException
 6     {
 7         //从Main线程的对象池中获取对象
 8         user = User.getInstance();
 9         System.out.println(user);
10         
11         Thread t1 = new Thread(new Runnable()
12         {   
13             @Override
14             public void run()
15             {
16                 //回收Main线程创建的user对象
17                 user.recycle();
18             }
19         });
20         t1.start();
21         
22         //睡眠1秒,确保线程t1将对象user回收了
23         Thread.sleep(1000);
24         
25         //从main线程的对象池中获取对象,虽然main线程没有回收对象,但是线程t1已结将对象user进行了回收
26         //所以user2将会从对象池中获取对象
27         User user2 = User.getInstance();
28         //user2的toString和user的toString相等
29         System.out.println(user2.toString());
30     }
31 }

结果如下:

1 main创建对象:com.mrhu.opin.recycle.User@8807e25
2 com.mrhu.opin.recycle.User@8807e25
3 线程Thread-1回收对象:com.mrhu.opin.recycle.User@8807e25
4 com.mrhu.opin.recycle.User@8807e25

可以得出结论:对象是和线程绑定的,也就是说对象是哪个线程创建的,那么不管这个对象是通过哪个线程回收的,最终都会回收到创建这个对象的线程池中去。

三、Recycler的实现原理

目前已经了解了对象池的大致用法,那么现在再了解下Recycler的实现原理是如何的。目前知道的情况是Recycler是和线程绑定的,那么很容易就想到了一个工具-----ThreadLocal

另外对象池存储对象地方需要提供两个方法,一个是将对象放回池,一个是从池中获取对象,Java中很多集合都可以实现这样的功能,而Recycler采用的是栈这样的数据结构,实现是Recycler的一个内部类Stack。

由于一个线程对应一个对象池,所以每个线程也对应一个Stack对象

Stack源码如下:

Stack的属性

 1 final Recycler<T> parent;//所属的Recycler对象
 2 
 3     final WeakReference<Thread> threadRef;//绑定的线程引用,是个弱引用
 4     final AtomicInteger availableSharedCapacity;//stack的容量
 5     final int maxDelayedQueues; //最大延迟队列的长度
 6 
 7     private final int maxCapacity;//最大容量
 8     private final int ratioMask;
 9     private DefaultHandle<?>[] elements;//存储对象的数组,每个对象被封装成DefaultHandle对象
10     private int size;//当前stack的大小
11     private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
12     private WeakOrderQueue cursor, prev;//执行当前和上一个WeakOrderQueue
13     private volatile WeakOrderQueue head;//执行头WeakOrderQueue
14 
15     Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
16           int ratioMask, int maxDelayedQueues) {
17         this.parent = parent;
18         threadRef = new WeakReference<Thread>(thread);
19         this.maxCapacity = maxCapacity;
20         availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
21         elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
22         this.ratioMask = ratioMask;
23         this.maxDelayedQueues = maxDelayedQueues;
24     }

对象池中的对象被封装成了Handle对象,Handle的默认实现类是DefaultHandler,源码如下:

 public interface Handle<T> {
        void recycle(T object);
    }

static final class DefaultHandle<T> implements Handle<T> {
        private int lastRecycledId;
        private int recycleId;

        boolean hasBeenRecycled;

        private Stack<?> stack;
        private Object value;

        DefaultHandle(Stack<?> stack) {
            this.stack = stack;
        }

        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }
    }

可以看出每个对象的封装类DefaultHandler内部是有一个Stack的引用,所以说每个对象创建之后就会绑定一个stack,而stack绑定了一个线程,所以可以说每一个对象是和一个线程绑定的,从哪个线程创建就会从哪个线程回收。

了解了对象的存储结构之后,再看下对象是如何获取的。对象的获取是通过对象池的get()方法进行获取对象,源码如下:

 1 public final T get() {
 2         if (maxCapacityPerThread == 0) {
 3             return newObject((Handle<T>) NOOP_HANDLE);
 4         }
 5        //从threadLocal对象中获取本线程的Stack对象
 6         Stack<T> stack = threadLocal.get();
 7        //调用stack的pop方法获取一个对象
 8         DefaultHandle<T> handle = stack.pop();
 9         if (handle == null) {
10             //如果从stack中没有获取到对象,就创建一个新对象
11             handle = stack.newHandle();
12             handle.value = newObject(handle);
13         }
14         return (T) handle.value;
15     }

首先是从ThreadLocal对象中获取本线程的Stack对象,然后从Stack中获取对象,如果没有获取到对象,就创建新对象。这里的逻辑比较简单,再看下Stack的pop方法是如何实现的。

 1  DefaultHandle<T> pop() {
 2         int size = this.size;//获取当前stack的大小
 3         if (size == 0) {
 4             if (!scavenge()) {//如果stack没有存储对象,那么就从WeakOrderQueue中回收对象
 5                 return null;
 6             }
 7             size = this.size;//重新获取当前stack的大小
 8         }
 9         //获取之后size自减
10         size --;
11         //获取数组的最后一个元素,stack的结构就是先进后出,后进先出
12         DefaultHandle ret = elements[size];
13         elements[size] = null;
14         if (ret.lastRecycledId != ret.recycleId) {
15             throw new IllegalStateException("recycled multiple times");
16         }
17         ret.recycleId = 0;
18         ret.lastRecycledId = 0;
19         this.size = size;
20         return ret;
21     }

逻辑也不复杂,就是从Stack内部的数组中返回最后一个元素返回,这里有一个方法scavenge(),这个方法比较关键,是当stack中没有元素时,就先尝试执行scavenge方法从WeakOrderQueue中先回收对象到Stack中,后面再详解

了解了获取对象的方法实现之后,再看下回收对象的实现逻辑

对象的回收方法是调用了DefaultHandle的recycle方法,源码如下:

1 @Override
2         public void recycle(Object object) {
3             if (object != value) {
4                 throw new IllegalArgumentException("object does not belong to handle");
5             }
6            //直接将当前对象push到stack中
7             stack.push(this);
8         }

代码不多,逻辑就是将当前的对象重新push到当前线程的Stack中去,Stack的push方法如下:

 1 void push(DefaultHandle<?> item) {
 2         Thread currentThread = Thread.currentThread();
 3         //判断stack绑定的线程是否的当前的线程
 4         if (threadRef.get() == currentThread) {
 5             //stack绑定的线程就是当前线程,也就是创建对象和回收对象是同一个线程的情况
 6             pushNow(item);
 7         } else {
 8             //stack绑定的线程不是当前线程,也就是创建对象和回收对象不是同一个线程的情况
 9             pushLater(item, currentThread);
10         }
11     }

这里的push方法分成了两种情况,一种是回收的线程和创建对象的线程是同一个线程,一种是回收的线程和创建对象的线程不是同一个线程的情况。那就先看同一个线程的情况

 1 private void pushNow(DefaultHandle<?> item) {
 2         //判断当前对象是否已经回收
 3         if ((item.recycleId | item.lastRecycledId) != 0) {
 4             throw new IllegalStateException("recycled already");
 5         }
 6         //设置当前回收ID
 7         item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
 8 
 9         //判断当前stack的大小是否已经超过了最大容量
10         int size = this.size;
11         if (size >= maxCapacity || dropHandle(item)) {
12             // Hit the maximum capacity or should drop - drop the possibly youngest object.
13             return;
14         }
15         //如果当前stack的大小等于数组的大小,那么就需要扩容,默认是扩容两倍
16         if (size == elements.length) {
17             elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
18         }
19 
20         //设置数组的最后一位为回收的对象
21         elements[size] = item;
22         this.size = size + 1;
23     }

整体逻辑也不复杂,就是将回收的对象存入内部数组的最后一位,之前如果数组满了就进行扩容,默认是扩容两倍

目前为止,单线程使用的情况下的获取和回收基本上流程已经很清楚了,但是在多线程的情况下,其他线程回收了创建对象的线程的对象的时候,需要怎么进行回收呢?创建对象的线程又是怎么将其他线程回收的对象回收到本线程呢?

这里就是涉及到了另一个数据结构,WeakOrderQueue,看名字就能猜到是一个弱引用的有序队列。

 1 static final WeakOrderQueue DUMMY = new WeakOrderQueue();
 2 
 3     @SuppressWarnings("serial")
 4     //内部类Link对象,是一个链表存储对象,链表的节点是一个对象数组
 5     private static final class Link extends AtomicInteger {
 6         private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
 7 
 8         private int readIndex;
 9         private Link next;
10     }
11 
12     //指向头Link节点和尾Link节点
13     private Link head, tail;
14     //指向同一个stack的下一个WeakOrderQueue
15     private WeakOrderQueue next;
16     //当前线程的引用
17     private final WeakReference<Thread> owner;
18     //自增的ID
19     private final int id = ID_GENERATOR.getAndIncrement();
20     private final AtomicInteger availableSharedCapacity;

 WeakOrderQueue的结构如下图示,内部是一个Link的链表,每个Link是一个对象的数组,每个WeakOrderQueue又指向下一个WeakOrderQueue,而这WeakOrderQueue组成的链表是属于同一个Stack的。

简单点说,如果线程A创建了对象,那么从线程B回收的对象,不是直接回收到线程A的Stack中,而是创建一个WeakOrderQueue,并且将回收的对象放入WeakOrderQueue中,并且将Stack对象的head指向这个WeakOrderQueue,

此时如果线程C先回收线程A的对象,也和线程B一样创建一个WeakOrderQueue,并且将Stack的head执行这个WeakOrderQueue,并且将这个head的next执行原先为head的WeakOrderQueue。所以一个线程的对象池中存储的对象包含

了线程绑定的Stack,还包含了这个Stack指向的WeakOrderQueue链表,这些WeakOrderQueue就是其他线程回收这个线程的对象存储的地方。那么Stack如何使用这些WeakOrderQueue中的对象呢? 答案就是在获取对象的时候执行了scavenge方法,源码如下:

 1 boolean scavenge() {
 2         // continue an existing scavenge, if any
 3         if (scavengeSome()) {
 4             return true;
 5         }
 6 
 7         // reset our scavenge cursor
 8         prev = null;
 9         cursor = head;
10         return false;
11     }
12     
13     boolean scavengeSome() {
14         //上一个WeakOrderQueue引用
15         WeakOrderQueue prev;
16         //当前的WeakOrderQueue引用
17         WeakOrderQueue cursor = this.cursor;
18         if (cursor == null) {
19             prev = null;
20             cursor = head;
21             if (cursor == null) {
22                 return false;
23             }
24         } else {
25             prev = this.prev;
26         }
27 
28         boolean success = false;
29         do {
30             //调用WeakOrderQueue的transfer方法转化
31             if (cursor.transfer(this)) {
32                 success = true;
33                 break;
34             }
35             WeakOrderQueue next = cursor.next;
36             if (cursor.owner.get() == null) {
37                 // If the thread associated with the queue is gone, unlink it, after
38                 // performing a volatile read to confirm there is no data left to collect.
39                 // We never unlink the first queue, as we don't want to synchronize on updating the head.
40                 if (cursor.hasFinalData()) {
41                     for (;;) {
42                         if (cursor.transfer(this)) {
43                             success = true;
44                         } else {
45                             break;
46                         }
47                     }
48                 }
49 
50                 if (prev != null) {
51                     prev.setNext(next);
52                 }
53             } else {
54                 prev = cursor;
55             }
56 
57             cursor = next;
58 
59         } while (cursor != null && !success);
60 
61         this.prev = prev;
62         this.cursor = cursor;
63         return success;
64     }

实际就是从head指针开始遍历WeakOrderQueue,调用WeakOrderQueue的transfer方法,源码如下:

 1 boolean transfer(Stack<?> dst) {
 2             Link head = this.head;
 3             if (head == null) {
 4                 return false;
 5             }
 6 
 7             if (head.readIndex == LINK_CAPACITY) {
 8                 if (head.next == null) {
 9                     return false;
10                 }
11                 this.head = head = head.next;
12             }
13 
14             final int srcStart = head.readIndex;
15             int srcEnd = head.get();
16             final int srcSize = srcEnd - srcStart;
17             if (srcSize == 0) {
18                 return false;
19             }
20 
21             final int dstSize = dst.size;
22             final int expectedCapacity = dstSize + srcSize;
23 
24             if (expectedCapacity > dst.elements.length) {
25                 final int actualCapacity = dst.increaseCapacity(expectedCapacity);
26                 srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
27             }
28 
29             if (srcStart != srcEnd) {
30                 final DefaultHandle[] srcElems = head.elements;
31                 final DefaultHandle[] dstElems = dst.elements;
32                 int newDstSize = dstSize;
33                 for (int i = srcStart; i < srcEnd; i++) {
34                     DefaultHandle element = srcElems[i];
35                     if (element.recycleId == 0) {
36                         element.recycleId = element.lastRecycledId;
37                     } else if (element.recycleId != element.lastRecycledId) {
38                         throw new IllegalStateException("recycled already");
39                     }
40                     srcElems[i] = null;
41 
42                     if (dst.dropHandle(element)) {
43                         // Drop the object.
44                         continue;
45                     }
46                     element.stack = dst;
47                     dstElems[newDstSize ++] = element;
48                 }
49 
50                 if (srcEnd == LINK_CAPACITY && head.next != null) {
51                     // Add capacity back as the Link is GCed.
52                     reclaimSpace(LINK_CAPACITY);
53 
54                     this.head = head.next;
55                 }
56 
57                 head.readIndex = srcEnd;
58                 if (dst.size == newDstSize) {
59                     return false;
60                 }
61                 dst.size = newDstSize;
62                 return true;
63             } else {
64                 // The destination stack is full already.
65                 return false;
66             }
67         }

代码较长,大致逻辑就是将WeakOrderQueue的Link中的对象存入Stack的内部数组中,这里有个逻辑是 dropHandle方法

 1 boolean dropHandle(DefaultHandle<?> handle) {
 2      if (!handle.hasBeenRecycled) {
 3          if ((++handleRecycleCount & ratioMask) != 0) {
 4              // Drop the object.
 5              return true;
 6          }
 7          handle.hasBeenRecycled = true;
 8      }
 9      return false;
10  }

 这个方法逻辑是判断当前回收的对象是否回收超过了上限,默认的ratioMask的值为8,也就是如果一个对象回收了8次之后,就直接删除此对象,不对此对象进行回收。

除了Stack和WeakOrderQueue,Recycler内部还在多个地方使用了ThreadLocal的优化版本FastThreadLocal,主要有两个地方使用到了,

一个是FastThreadLocal<Stack>,用于存储每个线程的Stack,还有一个是FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>,用于存储一个Map<Stack,WeakOrderQueue>,但异线程回收对象的时候,从ThreadLocal中获取这个Map,然后根据目标线程的Stack为key获取这个Stack

的WeakOrderQueue,然后将回收的对象存入这个WeakOrderQueue中。

原文地址:https://www.cnblogs.com/jackion5/p/11369705.html