三步创建Disruptor应用
//第二步,创建消息处理的processors: disruptor.handleEventsWith(Consumer::handleEvent1); disruptor.handleEventsWith(Consumer::handleEvent2); disruptor.start(); System.err.println("已经启动"); //生产: Producer.produceEvents(disruptor);
Disruptor是一个高性能的用于线程间消息处理的开源框架。它的目标就是快.
我们知道,java.util.concurrent.ArrayBlockingQueue 是一个非常优秀的有界队列实现。Disruptor与之相比,性能更加的优秀。
Disruptor内部使用了RingBuffer,它是Disruptor的核心的数据结构。和其它的RingBuffer实现不同,Disruptor没有尾指针。这样实现是经过深思熟虑的,你可以看这篇文档了解其细节。
更多的参考资料请参照官方文档以及并发编程网上翻译的一些文章。
本文主要参考Disruptor入门这篇文章。
本文的代码已全部放在github上。
在正式使用Disruptor之前,我们先声明一个ObjectEvent类,它用来传递消息的内容。
public class ObjectEvent { private Object object; public Object getObject() { return object; } public ObjectEvent setObject(Object object) { this.object = object; return this; } }
第一步,创建一个Disruptor对象
这是一个单一生产者的例子,如果在你的代码中仅仅有一个事件生产者,那么可以设置为单一生产者模式来提高系统的性能。
第一个参数用来在ring buffer中创建event,第二个参数是ring buffer的大小,第三个参数是消费者处理消息而使用的线程池。第四个参数是单或者多生产者模式,地五个参数是可选的等待策略。
以上代码主要用来设置RingBuffer.
//第一步,创建一个Disruptor对象: Executor executor = Executors.newCachedThreadPool(); int bufferSize = 1024; Disruptor<ObjectEvent> disruptor = new Disruptor<>( ObjectEvent::new, bufferSize, executor, ProducerType.SINGLE, new LiteBlockingWaitStrategy() );
第二步,创建消息处理的processors
//第二步,创建消息处理的processors: disruptor.handleEventsWith(Consumer::handleEvent1); disruptor.handleEventsWith(Consumer::handleEvent2); disruptor.start(); System.err.println("已经启动"); //生产: Producer.produceEvents(disruptor);
定义了两个processor,并使用handleEventsWith注册到Disruptor。注意这个方法可以使用职责链模式,例如handleEventsWith(A).then(B)
。
然后就可以启动Disruptor了:
第三步,创建生产者
通过以上三步,我们就可以创建一个简单的应用Disruptor的例子了
public class Producer { //第三步,创建生产者: public static void produceEvents(Disruptor<ObjectEvent> disruptor) throws InterruptedException { RingBuffer<ObjectEvent> ringBuffer = disruptor.getRingBuffer(); for (long l = 0; true; l++) { String obj = "Test-" + l; ringBuffer.publishEvent((event, sequence) -> event.setObject(obj)); TimeUnit.MINUTES.sleep(1); } } }