第四部分-并发编程案例分析3:高性能队列Disruptor

1.为什么要用高性能队列Disruptor

为什么要说Disruptor?java SDK提供了2个有界队列
ArrayBlockQueue,LinkedBlockingQueue,基于ReentrantLock锁实现,在高并发情况下,锁的效率不高,更好的替代品有木有?Dosritpr

2.Disruptor介绍

性能更高的有界队列

Log4j2,Spring Messageing,HBase,Storm都用了Disruptor

为什么性能高?
1.内存分配连续,使用RingBuffer数据结构,数组元素初始化时一次性创建,缓存命中率更高;对象循环引用,避免频繁GC
2.避免伪共享,提升缓存利用率。
3.无锁算法,避免频繁加锁,解锁的性能消耗。
4.支持批量消费,消费者可以无锁消费多个消息

3.Disruptor 使用

  • 生产者生产的对象称为Event,使用Disruptor必须定义Event,实例代码中为LongEvent
  • 构建Disruptor要指定度列大小(bufferSize),还需要传入EventFactory,示例代码为LongEvent::new
  • 消费Event需要通过handleEventsWith()方法注册一个事件处理器,发布Event则需要通过publishEvent()放阿飞

伪代码


//自定义Event
class LongEvent {
  private long value;
  public void set(long value) {
    this.value = value;
  }
}
//指定RingBuffer大小,
//必须是2的N次方
int bufferSize = 1024;

//构建Disruptor
Disruptor<LongEvent> disruptor 
  = new Disruptor<>(
    LongEvent::new,
    bufferSize,
    DaemonThreadFactory.INSTANCE);

//注册事件处理器
disruptor.handleEventsWith(
  (event, sequence, endOfBatch) ->
    System.out.println("E: "+event));

//启动Disruptor
disruptor.start();

//获取RingBuffer
RingBuffer<LongEvent> ringBuffer 
  = disruptor.getRingBuffer();
//生产Event
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++){
  bb.putLong(0, l);
  //生产者生产消息
  ringBuffer.publishEvent(
    (event, sequence, buffer) -> 
      event.set(buffer.getLong(0)), bb);
  Thread.sleep(1000);
}

测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = FamaMonitorApplication.class)
public class DisruptorTest {

    class MessageEvent {
        public String message;
    }

    private static final int BUFFER_SIZE = 1024;

    @Test
    public void test(){
        // 构建disruptor
        Disruptor<MessageEvent> disruptor = new Disruptor<MessageEvent>(
                MessageEvent::new,
                BUFFER_SIZE,
                DaemonThreadFactory.INSTANCE
        );
        // 注册消费者,并编写消费逻辑
        disruptor.handleEventsWith(((event, sequence, endOfBatch) -> {
            Thread.sleep(1000);
            System.out.println("Message : " + event.message);
        }));

        // 启动
        disruptor.start();

        // 获取队列,然后生产
        RingBuffer<MessageEvent> ringBuffer = disruptor.getRingBuffer();
        for (int i = 0; true; i++) {
            int finalI = i;
            ringBuffer.publishEvent(((event, sequence, args) -> {
                event.message = finalI + "";
            }));
            Thread.yield();
        }

    }
}

4.Disruptor 提升性能的秘籍1-内存连续分配

ArrayBlockQueue底层使用数组+ReentrantLock锁实现。
RingBuffer本质也是数组,这一点性能没有什么区别,区别在于Disruptor对于RingBuffer基础上做了很多优化,其中重要一项就是内存连续分配

5.程序的局部性原理

一段时间内程序的执行会限定在一个局部范围内

时间局限性:程序的某条指令一旦被执行,不久之后,这条指令可能再次被执行,某条数据被访问,不就之后这条数据可能再次被访问。
空间局限期:某块内存一旦被访问,不久之后这块内存附近的内存也有可能被访问。

6.cpu寄存器,cpu缓存

cpu缓存说的就是cpu的寄存器,cpu从内存中加载数据x时,会将x缓存在寄存器里,实际上cpu缓存x的同时,还缓存了x周围的数据,(根据程序的空间局限期,周围数据也有可能被访问)。程序能够体现局部性原理,就可以更好的利用cpu寄存器,从而提升性能。
cpu寄存器的读取速度和内存读取速度不是一个数量级。

7.ArrayBlockQueue有没有利用局部性原理,提升cpu缓存命中率?

并没有
每次队列中添加元素,创建对象E,这个对象都是由生产者线程创建的,由于创建这些元素的时间是离散不连续的,这些元素的内存地址大概率不连续

image

8.Disruptor利用空间局限期原理,提升了cpu缓存命中率

RingBuffer内部也是数组,但数组中的元素都是在初始化时一次性创建的,这些元素的内存地址大概率是连续的


for (int i=0; i<bufferSize; i++){
  //entries[]就是RingBuffer内部的数组
  //eventFactory就是前面示例代码中传入的LongEvent::new
  entries[BUFFER_PAD + i] 
    = eventFactory.newInstance();
}

9.数组中的元素内存地址连续就能提升性能么?

对,可以
消费线程在消费队列时,遵循空间局限性原则,消费完第一个元素,很快就会消费第二个元素。消费第一个元素E1的时候,cpu会把内存E1后面的数据也加载进cpu寄存器里。如果第二个元素E2和E1的内存地址连续的。E2也会被加进Cache中。消费E2的时候,E2已经在cpu寄存器中了,不需要从内存再加载一遍,大大提升了性能了就
image

10.生产者生产时,不是往队列中add,而是修改队列中的元素对象

publishEvent()发布Event时,并不创建新Event,而是event.set()修改event
RingBuffer创建的event是可以循环利用的。这样还能避免频繁创建,删除Event导致频繁GC问题

11.避免伪共享,避免缓存不可用也同样重要

伪共享的存在,使得cache失效

cpu内存的缓存是按照缓存行(Cache Line)管理的,缓存行的大小通常是64字节;cpu从内存加载数据x,会同时加载x后面的(64-size(x))的数据。

Ar'rayBlockQueue的内部结构


/** 队列数组 */
final Object[] items;
/** 出队索引 */
int takeIndex;
/** 入队索引 */
int putIndex;
/** 队列中元素总数 */
int count;

cpu从内存加载takeIndex时,会连带将putIndex和count都加载进cache。
image

线程A运行在cpu1核上,入队操作,入队会修改putIndex,修改putIndex会导致所有核的缓存均实效,此时线程B在cpu2核上执行出队,需要获取takIndex,takeIndex缓存已实效,所以必须重新从内存里再拉取一遍,性能就下来了。。

12.taskIndex缓存为什么会失效?

入队操作不会修改takeIndex,但是takeIndex和putIndex共享一个缓存行,导致出队操作利用不了缓存,这就是伪共享带来的问题,性能并没有带来替身。即使数组中的对象内存地址是连续的。

13.如何避免伪共享带来的缓存不可用情况出现?

每个变量独占一个缓存行,不共享缓存行就可以
putIndex是一个缓存行
takeIndex是另一个缓存行

如何保证putIndex是一个缓存行呢?
在taskIndex的前后各填充56个字节,这样就能保证takeIndex独占一个缓存行


//前:填充56字节
class LhsPadding{
    long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding{
    volatile long value;
}
//后:填充56字节
class RhsPadding extends Value{
    long p9, p10, p11, p12, p13, p14, p15;
}
class Sequence extends RhsPadding{
  //省略实现
}

14.综上所述

数组中连续的内存地址对象+排除伪共享情况,
就可以充分的利用cpu的缓存提升性能
以上代码设计上面的改动,说白了就是更极端的压榨硬件cpu缓存的性能

15.无锁算法

ArrayBlockQueue中使用了ReentrantLock锁,高并发情况下,性能不高
而Disruptor中采用无锁算法,没有加锁,解锁的性能消耗影响

看上去有点复杂,目前没看太明白,总之性能要优于使用ReentrantLock锁
基本逻辑是:如果没有足够的空余位置,就出让 CPU 使用权,然后重新计算;反之则用 CAS 设置入队索引


//生产者获取n个写入位置
do {
  //cursor类似于入队索引,指的是上次生产到这里
  current = cursor.get();
  //目标是在生产n个
  next = current + n;
  //减掉一个循环
  long wrapPoint = next - bufferSize;
  //获取上一次的最小消费位置
  long cachedGatingSequence = gatingSequenceCache.get();
  //没有足够的空余位置
  if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
    //重新计算所有消费者里面的最小值位置
    long gatingSequence = Util.getMinimumSequence(
        gatingSequences, current);
    //仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
    if (wrapPoint > gatingSequence){
      LockSupport.parkNanos(1);
      continue;
    }
    //从新设置上一次的最小消费位置
    gatingSequenceCache.set(gatingSequence);
  } else if (cursor.compareAndSet(current, next)){
    //获取写入位置成功,跳出循环
    break;
  }
} while (true);

16.jvm对于伪代码共享的支持

@sun.misc.Contended
注解可以轻松避免伪共享(前提是jvm参数-XX:-RestrictContended),以牺牲内存为代价。

原创:做时间的朋友
原文地址:https://www.cnblogs.com/PythonOrg/p/14899395.html