Disruptor叫无锁、高并发、环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率,用于生产者消费者模式(如果说按照设计者角度来讲他就是观察者模式)。什么叫观察者模式,想象一下,我们在前面学各种各样的队列的时候,队列就是个容器,好多生产者往里头扔东西,好多消费者从里头往外拿东西。所谓的生产者消费者就是这个意思,为什么我们可以叫他观察者呢,因为这些消费者正在观察着里面有没有新东西,如果有的话我马上拿过来消费,所以他也是一种观察者模式。Disruptor实现的就是这个容器
- 对比ConcurrentLinkedQueue : 链表实现
- JDK中没有ConcurrentArrayQueue
- Disruptor是数组实现的
- 无锁,高并发,使用环形Buffer,直接覆盖(不用清除)旧的数据,降低GC频率
- 实现了基于事件的生产者消费者模式(观察者模式)
ConcurrentLinkedQueue这里面就是一个一个链表,这个链表遍历起来肯定没有数组快,这个是一点。还有第二点就是这个链表要维护一个头指针和一个尾指针,我往头部加的时候要加锁,往尾部拿的时候也要加锁。另外链表本身效率就偏低,还要维护两个指针。关于环形的呢,环形本身就维护一个位置,这个位置称之为sequence序列,这个序列代表的是我下一个有效的元素指在什么位置上,就相当于他只有一个指针来回转。加在某个位置上怎么计算:直接用那个数除以我们整个的容量求余就可以了。
- 环形队列
- RingBuffer的序号,指向下一个可用的元素
- 采用数组实现,没有首尾指针
- 对比ConcurrentLinkedQueue,用数组实现的速度更快
假如长度为8,当添加到第12个元素的时候在哪个序号上呢?用12%8决定当Buffer被填满的时候到底是覆盖还是等待,由Producer决定
长度设为2的n次幂,利于二进制计算,例如:12%8 = 12 & (8 - 1) pos = num & (size -1)
由于它会采用覆盖的方式,所以他没有必要记头指针,没有必要记尾指针。我只要记一个指针放在这就可以了。在这点上依然要比ConcurrentLinkedQueue要快。
那我生产者线程生产的特别多,消费者没来得及消费那我在往后覆盖的话怎么办?不会那么轻易的让你覆盖的,我们是有策略的,我生产者生产满了,要在生产一个的话就马上覆盖这个位置上的数了。这时候是不能覆盖的,指定了一个策略叫等待策略,这里面有8中等待策略,分情况自己去用。最常见的是BlockingWait,满了我就在这等着,什么时候你空了消费者来唤醒一下就继续。
Disruptor开发步骤1.引入Maven依赖
com.lmax disruptor 3.3.6
2.定义Event-队列中需要处理的元素。
在Disruptor他是每一个消息都认为是一个事件,在他这个概念里就是一个事件,所以在这个环形队列里面存的是一个一个的Event。
重写了toString方法,输出对象的时候增加可读性。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
3.定义Event工厂,用于填充队列
指定Event的工厂生产LongEvent,实现EventFactory接口重写newInstance方法。
这里牵扯效率问题,因为Disruptor初始化的时候会调用Event工厂,对ringBuffer进行内存的提前分配,GC频率会降低。
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory{ @Override public LongEvent newInstance() { return new LongEvent(); } }
4.定义EventHandler(消费者),处理容器中的元素
那这个Event怎么消费呢,就需要指定Event的消费者EventHandler。
重写EventHandler接口中的onEvent方法
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler{ public static long count = 0; @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { count ++; System.out.println("[" + Thread.currentThread().getName() + "]:" + event + " :序号:" + sequence); } }
小结:我们现在有一个环,然后这个环上每一个位置装LongEvent,怎么产生这个LongEvent通过这个LongEventFactory的newInstance方法来产生,当我拿到这个Event之后通过LongEventHandler进行处理。
public class Main01
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor factory产生消息的工厂 defaultThreadFactory线
//程工厂,指的是当他要产生消费者的时候,当要调用这个消费者的时候他是在一个特定的线程里执行
//的,这个线程就是通过defaultThreadFactory来产生;
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();//获取环形数组
//官方例程
long sequence = ringBuffer.next(); // Grab the next sequence 找到下一个位置下标
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor 获取预先初始化好的event
// for the sequence
event.set(8888L); // Fill with data 填充数据
}先找到下标 然后把这个位置上的event拿出来再填充数据 然后发布消息
finally
{
ringBuffer.publish(sequence);//生产者发布
}
}
}
02小程序使用translator,就是怎么样构建这个消息,原来我们都是用消息的factory,但是
下面这次我们用translator对他进行构建,就是把某一些数据翻译成消息。前面产生event工厂还是一样,然后bufferSize,后面再扔的是DaemonThreadFactory就是后台线程了,new LongEventHandler然后start拿到他的ringBuffer,前面都一样。只有一个地方叫EventTranslator不一样,我们在main01里面的代码是要写try catch然后把里面的值给设好,相当于把这个值转换成event对象。相对简单的写法,它会把某些值转成一个LongEvent,通过EventTranslator。new出来后实现了translateTo方法,EventTranslator他本身是一个接口,所以你要new的时候你又要实现它里面没有实现的方法,
translateTo的意思是你给我一个Event,我会把这个Event给你填好。
ringBuffer.publishEvent(translator1) 你只要把translator1交个ringBuffer就可以了。这个translator就是为了迎合Lambda表达式的写法(为java8的写法做准备)
EventTranslatorOneArg只有带一个参数的EventTranslator。我带有一个参数,这个参数会通过我的translateTo方法转换成一个LongEvent
既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg,还有EventTranslatorVararg多了去了Vararg就是有好多个值,我把里面的值全都给你加起来最后把结果set到event里面。
public class Main02
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//=============================================================== 为Java8的写法做准备
EventTranslator translator1 = new EventTranslator() {//用translator对他进行构建,就是把某一些数据翻译成消息
@Override
public void translateTo(LongEvent event, long sequence) {
event.set(8888L);
}
};
ringBuffer.publishEvent(translator1);
//this.sequencer.publish(sequence);
//===============================================================
EventTranslatorOneArg translator2 = new EventTranslatorOneArg() {
@Override
public void translateTo(LongEvent event, long sequence, Long l) {
event.set(l);
}
};
ringBuffer.publishEvent(translator2, 7777L);
//===============================================================
EventTranslatorTwoArg translator3 = new EventTranslatorTwoArg() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2) {
event.set(l1 + l2);
}
};
ringBuffer.publishEvent(translator3, 10000L, 10000L);
//===============================================================
EventTranslatorThreeArg translator4 = new EventTranslatorThreeArg() {
@Override
public void translateTo(LongEvent event, long sequence, Long l1, Long l2, Long l3) {
event.set(l1 + l2 + l3);
}
};
ringBuffer.publishEvent(translator4, 10000L, 10000L, 1000L);
//===============================================================
EventTranslatorVararg translator5 = new EventTranslatorVararg() {
@Override
public void translateTo(LongEvent event, long sequence, Object... objects) {
long result = 0;
for(Object o : objects) {
long l = (Long)o;
result += l;
}
event.set(result);
}
};
ringBuffer.publishEvent(translator5, 10000L, 10000L, 10000L, 10000L);
}
}
Lambda表达式怎么写,这个是比较简洁的写法,连factory都省了,直接指定一个Lambda表达式LongEvent::new。继续handleEventsWith把三个参数传进来后面写好Lambda表达式直接打印,然后start, 接着RingBuffer,publishEvent原来我们还有写try…catch,现在简单了直接ringBuffer.publishEvent(第一个是lambda表达式,表达式后是你指定的几个参数),所以现在的这种写法就不定义各种各样的EventTranslator了。
public class Main03
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//ringBuffer.publishEvent((event, sequence) -> event.set(10000L));
ringBuffer.publishEvent((event, sequence, l) -> event.set(l), 10000L);
ringBuffer.publishEvent((event, sequence, l1, l2) -> event.set(l1 + l2),
10000L, 10000L);
System.in.read();
}
}
ProducerType生产者线程模式
- ProducerType有两种模式ProducerMULTI和Producer.SINGLE
- 默认是MULTI,表示在多线程模式下产生sequence
- 如果确认是单线程生产者,那么可以指定SINGLE,效率会提升
- 如果是多个生产者(多线程),但模式指定为SINGLE,会出什么问题?
假如你的程序里头只有一个生产者还用ProducerMULTI的话,我们对序列来进行多线程访问的时候肯定是要加锁的,所以MULTI里面默认是有锁定处理的,但是假如你只有一个线程这个时候应该吧生产者指定为SINGLE,他的效率更高,因为它里面不加锁。
指定的是Producer.SINGLE,但是我生产的时候用的是一堆线程,当我制定了Producer.SINGLE之后相当于内部对于序列的访问就没有锁了,它会把性能发挥到极致,它不会报错,它会把你的消息静悄悄的覆盖了,因此你要小心一点。我这里这个写法是我有50 个线程然后每个线程生产100个数,最后结果正常的话应该是有5000个消费产生。
单线程不加锁,最后小于5000
public class Main04_ProducerType
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
// Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
//指定单线程模式
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.SINGLE, new BlockingWaitStrategy());//ProducerType.SINGLE ProducerType.MULTI
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 50;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.set(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
等待策略
生产者的生产速度大于消费者的消费速度,生产者要等待消费者消费消息。
生产者线程生产的特别多,消费者没来得及消费那我在往后覆盖的话怎么办?不会那么轻易的让你覆盖的,我们是有策略的,我生产者生产满了,要在生产一个的话就马上覆盖这个位置上的数了。这时候是不能覆盖的,指定了一个策略叫等待策略,这里面有8中等待策略,分情况自己去用。最常见的是BlockingWait,满了我就在这等着,什么时候你空了消费者来唤醒一下就继续。
- (常用)BlockingWaitStrategy:通过线程堵塞的方式,等待生产者唤醒,被唤醒后,再循环检依赖的sequence是否已经消费。
- BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
- LiteBlockingWaitStrategy:线程阻塞等待生产者唤醒,与BlockingWaitStrategy相比,区别在signalNeeded.getAndSet,如果两个线程同时访问一个访问waitfor,一个访问signalAll时,可减少lock加锁次数
- LiteTimeoutBlockingWaitStrategy:与LiteBlockingWaitStrategy相比,设置了阻塞时间,超过时间后抛出异常
- PhasedBackoffWaitStrategy:根据时间参数和传入的等待策略来决定使用那种等待策略
- TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛出异常
- (常用)YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu
- (常用)SleepingWaitStrategy:sleep
BlockingWaitStrategy满了就等着;SleepingWaitStrategy满了就睡一觉,睡醒了看看能不
能继续执行了;YieldingWaitStrategy让出cpu,让你消费者赶紧消费,消费完了之后我又回来看看我是不是又能生产了;一般YieldingWaitStrategy效率是最高的,但也要看实际情况适用不适用。
50个线程,每个线程生产100个消息,然后5000个被消费
public class Main05_WaitStrategy
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI, new SleepingWaitStrategy());
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 50;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 100; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.set(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println("LongEventHandler.count:"+LongEventHandler.count);
}
}
多个消费者
public class Main06_MultiConsumer
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI, new SleepingWaitStrategy());
// Connect the handlers
LongEventHandler h1 = new LongEventHandler();
LongEventHandler h2 = new LongEventHandler();
disruptor.handleEventsWith(h1, h2);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 10;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 10; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.set(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);//200 因为一个消息被一个消费者分别消费一次
}
}
消费者异常处理
默认:disruptor.setDefaultExceptionHandler()
覆盖:disruptor.handleExceptionFor().with()
这里方法里写了一个EventHandler是我们的消费者,在消费者里打印了event之后马上抛出了异常,当我们消费者出现异常之后你不能让整个线程停下来,有一个消费者出了异常那其他的消费者就不干活了,肯定不行。handleExceptionsFor为消费者指定Exception处理器 (h1).with后面是我们的ExceptionHandler出了异常之后该怎么办进行处理,重写三个方法,第一个是当产生异常的时候在这很简单直接打印出来了;第二个是handleOnStart如果启动的时候出异常;第三个handleOnShutdown你该怎么处理。
public class Main07_ExceptionHandler
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
ProducerType.MULTI, new SleepingWaitStrategy());
// Connect the handlers
EventHandler h1 = (event, sequence, end) -> {
System.out.println(event);
throw new Exception("消费者出异常");
};
disruptor.handleEventsWith(h1);
disruptor.handleExceptionsFor(h1).with(new ExceptionHandler() {
@Override
public void handleEventException(Throwable throwable, long l, LongEvent longEvent) {
throwable.printStackTrace();
}
@Override
public void handleOnStartException(Throwable throwable) {
System.out.println("Exception Start to Handle!");
}
@Override
public void handleOnShutdownException(Throwable throwable) {
System.out.println("Exception Handled!");
}
});
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer ringBuffer = disruptor.getRingBuffer();
//================================================================================================
final int threadCount = 1;
CyclicBarrier barrier = new CyclicBarrier(threadCount);
ExecutorService service = Executors.newCachedThreadPool();
for (long i = 0; i < threadCount; i++) {
final long threadNum = i;
service.submit(()-> {
System.out.printf("Thread %s ready to start!n", threadNum );
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
for (int j = 0; j < 1; j++) {
ringBuffer.publishEvent((event, sequence) -> {
event.set(threadNum);
System.out.println("生产了 " + threadNum);
});
}
});
}
service.shutdown();
//disruptor.shutdown();
TimeUnit.SECONDS.sleep(3);
System.out.println(LongEventHandler.count);
}
}
总结
disruptor是一个环,然后这个环有多个生产者可以往里头生产,由于它是环形的设计效率会非常的高,我们写程序的时候是这样写的,首先你自己定义好Event消息的格式,然后定义消息工厂,消息工厂是用来初始化整个环的时候相应的一些位置上各种各样不同的消息先把它new出来,new出来之后先占好空间,我们在生产的时候只需要把这个位置上这个默认的这块空间拿出来往里头填值,填好值之后消费者就可以往里头消费了,消费完了生产者就可以继续往里头生产了,如果说你生产者消费的比较快,消费着消费的比较慢,满了怎么办,就是用各种各样的等待策略,消费者出了问题之后可以用ExceptionHandler来进行处理。



