前两篇Disruptor学习笔记:基本使用、核心概念和原理和Netty整合Disruptor实战文章中,主要讲解了Disruptor的一些API的使用、概念等,本文会进一步来解析Disruptor的核心原理和相关源码
1、Disruptor的性能为什么这么高?- 内存分配更加合理,使用RingBuffer数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;对象循环利用,避免频繁GC
- 能够避免伪共享,提升缓存利用率
- 采用无锁算法,避免频繁加锁、解锁的性能消耗(CAS操作代替锁)
- 支持批量消费,消费者可以无锁方式消费多个消息
Disruptor是使用RingBuffer作为数据存储,RingBuffer是一个首位相接的环,用于在不同上下文(线程)间传递数据的buffer
RingBuffer拥有一个序号,这个序号指向数组中下一个可用元素
随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环
要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array length = array index
以上面的RingBuffer为例:12 % 10 = 2
事实上,上图中的RingBuffer只有10个槽完全是个意外。如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算
2)、RingBuffer如何提升性能主要有以下两点:
- 使用数组实现,对CPU缓存友好
- 数组预先分配内存,数组对象一直存在,避免频繁创建、删除Event导致的频繁GC问题
1)使用数组实现,对CPU缓存友好
CPU从内存加载数据到CPU Cache里面的时候,不是一个变量一个变量加载的,而是加载固定长度的Cache Line。如果是加载数组里面的数据,那么CPU就会加载到数组里面连续的多个数据
RingBuffer也是用数组实现的,但是这个数组中的所有元素在初始化时是一次性全部创建的,所以这些元素的内存地址大概率是连续的
abstract class RingBufferFieldsextends RingBufferPad { private final Object[] entries; RingBufferFields( EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; // BUFFER_PAD为数组填充大小,避免数组的有效元素出现伪共享 // 数组的前X个元素会出现伪共享和后X个元素可能会出现伪共享,可能和无关数据加载到同一个缓存行 // 额外创建2个填充空间的大小,首尾填充,避免数组的有效载荷和其它成员加载到同一缓存行 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; fill(eventFactory); } private void fill(EventFactory eventFactory) { for (int i = 0; i < bufferSize; i++) { // BUFFER_PAD+i为真正的数组索引 entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
数组中所有元素内存地址连续能提升性能吗?能!为什么呢?因为消费者线程在消费的时候,是遵循空间局部性原理的,消费完第1个元素,很快就会消费第2个元素;当消费第1个元素E1的时候,CPU会把内存中E1后面的数据也加载进Cache,如果E1和E2在内存中的地址是连续的,那么E2也就会被加载进Cache中,然后当消费第2个元素的时候,由于E2已经在Cache中了,所以就不需要从内存中加载了,这样就能大大提升性能
Q:为什么创建元素的时间离散会导致元素的内存地址不是连续的?这些元素不是存在数组中的吗?数组初始化不是已经连续分配内存了吗?
A:数组连续,数组里只有引用,E1、E2这些对象的地址不连续。基础类型的数组,数组是连续的,数组里的值也是连续的;若是对象数组,则数组引用是连续的,数组里存放的元素内存地址是离散的
2)数组预先分配内存,数组对象一直存在,避免频繁创建、删除Event导致的频繁GC问题
在Disruptor中,生产者线程通过publishEvent()发布Event的时候,并不是创建一个新的Event,而是通过event.set()方法修改Event, 也就是说RingBuffer创建的Event是可以循环利用的,这样还能避免频繁创建、删除 Event导致的频繁GC问题
3、如何避免伪共享 1)、什么是伪共享数据X、Y、Z被加载到同一Cache Line中,线程A在Core1上修改X,而修改X会导致其所在的所有核上的缓存行均失效;假设此时线程B在Core2上读取Y,由于X所在的缓存行已经失效,所有Core2必须从内存中重新读取。线程A的操作不会修改Y,但是由于X和Y共享的是一个缓存行,就导致线程B不能很好地利用Cache,这其实就是伪共享。简单来说,伪共享指的是由于共享缓存行导致缓存无效的场景
2)、Disruptor中如何避免伪共享Disruptor是采用缓存行填充的方法来避免伪共享的
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{
Sequence对象中的value属性就能避免伪共享,因为这个属性前后都填充了56个字节。56字节(前)、8字节(value)、56字节(后)。64字节一个Cache Line的话不管怎样切都能保证value在一个Cache Line
3)、Contended注解方式在JDK1.8中,新增了一种注解@sun.misc.Contended来使各个变量在Cache Line中分隔开。其原理是在使用此注解的对象或字段的前后各增加128字节大小的padding,使用2倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。可以在类前或属性前加上此注释:
// 类前加上代表整个类的每个变量都会在单独的Cache Line中
@sun.misc.Contended
@SuppressWarnings("restriction")
public class ContendedData {
int value;
long modifyTime;
boolean flag;
long createTime;
char key;
}
或者:
// 属性前加上时需要加上组标签
@SuppressWarnings("restriction")
public class ContendedGroupData {
@sun.misc.Contended("group1")
int value;
@sun.misc.Contended("group1")
long modifyTime;
@sun.misc.Contended("group2")
boolean flag;
@sun.misc.Contended("group3")
long createTime;
@sun.misc.Contended("group3")
char key;
}
采取上述措施图示:
在默认情况下,@Contended注解只用于Java核心类,比如rt包下的类。 如果用户类路径下的类需要使用这个注解,则需要添加JVM参数:-XX:-RestrictContended。填充的宽度默认为128,要自定义宽度则可以设置-XX:ContendedPaddingWidth参数
4、Disruptor中生产消费模型RingBuffer中每格中都有序号,并且RingBuffer实时监测值最大(最新)的序号,该序号指向RingBuffer中最后一格。RingBuffer采用的是对比序号的方式实现了生产者和消费者之间的资源协调
1)、生产入队示例代码:
public class OrderEventProducer {
private RingBuffer ringBuffer;
public OrderEventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(Long price) {
// 1.在生产者发送消息的时候,首先需要从ringBuffer中获取一个可用的序号
long sequence = ringBuffer.next();
try {
// 2.根据这个序号,找到具体的OrderEvent元素 注意:此时获取的OrderEvent对象是一个属性没有被赋值的对象
OrderEvent orderEvent = ringBuffer.get(sequence);
// 3.进行实际的赋值
orderEvent.setPrice(price);
} finally {
// 4.提交发布操作
ringBuffer.publish(sequence);
}
}
}
生产者向RingBuffer中写入数据需要通过两阶段提交。首先,生产者必须确定RingBuffer中下一个可以插入的格(示例代码中的ringBuffer.next()),如下图所示:
RingBuffer持有最近写入格的序号(上图中的18格),从而确定下一个插入格的序号
RingBuffer通过检查所有事件处理器正在从RingBuffer中读取的当前序号来判断下一个插入格是否空闲
当生产者得到下一个序号后,它可以获得该格中的对象,并可以对该对象进行赋值操作
同时,在生产者处理19格数据的时候,RingBuffer的序号依然是18,所以其他事件处理器将不会读到19格中的数据
最终,发布者最终将数据写入19格后,通知RingBuffer发布19格的数据(示例代码中的ringBuffer.publish(sequence))。这时,RingBuffer更新序号并且所有从RingBuffer读数据的事件处理器都可以看到19格中的数据
2)、消费出队当发布者向RingBuffer请求下一个空格来写入时,事件处理器(BatchEventProcessor)将监控它处理的最新的序号并请求它所需要的下一个序号
事件处理器不是直接向RingBuffer请求序号,而是通过SequenceBarrier向RingBuffer请求序号
如图上图所示,事件处理器的最大序号是16。它向SequenceBarrier调用waitFor(17)以获得17格中的数据。因为没有数据写入RingBuffer,事件处理器挂起等待下一个序号。如果这样,没有什么可以处理。但是,如图上图所示的情况,RingBuffer已经被填充到18格,所以waitFor函数将返回18并通知事件处理器,它可以读取包括直到18格在内的数据,如图下图所示
5、Disruptor核心类- RingBuffer:Disruptor最主要的组件,仅仅负责存储和更新事件对象
- Sequence:Disruptor使用Sequence来表示一个特殊组件处理的序号。和Disruptor一样,每一个消费者(EventProcessor)都维持着一个Sequence。大部分的并发代码依赖这这个值。这个类维护了一个long类型的value,采用的unsafe进行的更新操作
- Sequencer:这是Disruptor真正的核心。实现了这个接口的两种生产者(单生产者和多生产者)均实现了所有的并发算法,为了在生产者和消费者之间进行准确快速的数据传递
- SequenceBarrier:由Sequencer生成,并且包含了已经发布的Sequence的引用,这些Sequence源于Sequencer和一些独立的消费者的Sequence。它包含了决定是否有供消费者消费的Event的逻辑。用来权衡当消费者无法从RingBuffer里面获取事件时的处理策略(例如:当生产者太慢,消费者太快,会导致消费者获取不到新的事件会根据该策略进行处理,默认会堵塞)
- WaitStrategy:决定一个消费者将如何等待生产者将Event置入Disruptor的策略。用来权衡当生产者无法将新的事件放进RingBuffer时的处理策略(例如:当生产者太快,消费者太慢,会导致生产者获取不到新的事件槽来插入新事件,则会根据该策略进行处理,默认会堵塞)
- Event:从生产者到消费者过程中所处理的数据单元。Disruptor中没有代码表示Event,因为它完全是由用户定义的
- EventProcessor:主要事件循环,处理Disruptor中的Event,并且拥有消费者的Sequence。它有一个实现类是BatchEventProcessor,包含了event loop有效的实现,并且将回调到一个EventHandler接口的实现对象
- EventHandler:由用户实现并且代表了Disruptor中的一个消费者的接口
- WorkHandler:在work模式下使用。由用户实现并且代表了Disruptor中的多个消费者的接口
- WorkProcessor:确保每个sequence只被一个processor消费,在同一个WorkPool中的处理多个WorkProcessor不会消费同样的sequence
- WorkerPool:一个WorkProcessor池,其中WorkProcessor将消费Sequence,所以任务可以在实现WorkHandler接口的worker之间移交
- LifecycleAware:当BatchEventProcessor启动和停止时,实现这个接口用于接收通知
上面我们说到,生产入队操作二阶段提交,先调用RingBuffer的next(int n)方法获取一个可用的序号,修改完Event元素后再调用RingBuffer的publish(long sequence)方法提交发布操作,代码如下:
abstract class RingBufferFieldsextends RingBufferPad { protected final Sequencer sequencer; public final class RingBuffer extends RingBufferFields implements Cursored, EventSequencer , EventSink { @Override public long next(int n) { return sequencer.next(n); } @Override public void publish(long sequence) { sequencer.publish(sequence); }
RingBuffer的这两个方法最终调用了Sequencer的对应方法,Sequencer继承关系图如下:
单生产者模式下会由SingleProducerSequencer来处理,多生产者模式下会由MultiProducerSequencer来处理
1)SingleProducerSequencer单线程事件发布者
public final class SingleProducerSequencer extends SingleProducerSequencerFields
{
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
// 已分配的序号的缓存(已分配到这里),初始-1
long nextValue = this.nextValue;
// 本次申请分配的序号
long nextSequence = nextValue + n;
// 构成环路的点:环形缓冲区可能追尾的点 = 本次申请的序号 - 环形缓冲区大小
// 如果该序号大于最慢消费者的进度,那么表示追尾了,需要等待
long wrapPoint = nextSequence - bufferSize;
// 上次缓存的最小网关序号(消费最慢的消费者的进度)
long cachedGatingSequence = this.cachedValue;
// wrapPoint > cachedGatingSequence 表示生产者追上消费者产生环路(追尾),即缓冲区已满,此时需要获取消费者们最新的进度,以确定是否队列满
// cachedGatingSequence > nextValue 表示消费者的进度大于生产者进度,nextValue无效,正常情况下不会出现
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
// 插入StoreLoad内存屏障/栅栏,保证可见性
// 因为publish使用的是set()/putOrderedLong,并不保证其他消费者能及时看见发布的数据
// 当我再次申请更多的空间时,必须保证消费者能消费发布的数据
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
// 如果末端的消费者们仍然没让出该插槽则等待,直到消费者们让出该插槽
// 注意:这是导致死锁的重要原因!
// 死锁分析:如果消费者挂掉了,而它的sequence没有从gatingSequences中删除的话,则生产者会死锁,它永远等不到消费者更新
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
// 缓存生产者们最新的消费进度
// (该值可能是大于wrapPoint的那么如果下一次的wrapPoint小于等于cachedValue则可以直接进行分配)
// 比如:我可能需要一个插槽位置,结果突然直接消费者们让出来3个插槽位置
this.cachedValue = minSequence;
}
// 这里只写了缓存,并未写volatile变量,因为只是预分配了空间但是并未被发布数据,不需要让其他消费者感知到
// 消费者只会感知到真正被发布的序号
this.nextValue = nextSequence;
return nextSequence;
}
@Override
public void publish(long sequence)
{
// 更新发布进度,使用的是set(putOrderedLong),并没有保证对其他线程立即可见(最终会看见)
// 在下一次申请更多的空间时,如果发现需要消费者加快消费,则必须保证数据对消费者可见
cursor.set(sequence);
// 唤醒阻塞的消费者们(事件处理器们)
waitStrategy.signalAllWhenBlocking();
}
2)MultiProducerSequencer多线程事件发布者
public final class MultiProducerSequencer extends AbstractSequencer
{
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
// 使用缓存增加了复杂度
do
{
current = cursor.get();
next = current + n;
// 可能构成环路的点/环形缓冲区可能追尾的点 = 请求的序号 - 环形缓冲区大小
long wrapPoint = next - bufferSize;
// 缓存的消费者们的最慢进度值,小于等于真实进度
long cachedGatingSequence = gatingSequenceCache.get();
// 第一步:空间不足就继续等待
// 1.wrapPoint > cachedGatingSequence 表示生产者追上消费者产生环路,上次看见的序号缓存无效,即缓冲区已满,此时需要获取消费者们最新的进度,以确定是否队列满
// 2.cachedGatingSequence > current 表示消费者的进度大于当前生产者进度,表示current无效,有以下可能:
// 2.1 其它生产者发布了数据,并更新了gatingSequenceCache,并已被消费(当前线程进入该方法时可能被挂起,重新恢复调度时看见一个更大值)
// 2.2 claim的调用(建议忽略)
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// 走进这里表示cachedGatingSequence过期或current过期,此时都需要获取最新的gatingSequence
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 消费者最新的进度仍然与我构成了环路,那么只能重试
if (wrapPoint > gatingSequence)
{
// wrapPoint > gatingSequence 意外着gatingSequence无效,因为生产者期待的是一个大于等于wrapPoint的值,因此也就不更新缓存
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 检测到未构成环路(多线程下这都是假设条件),更新网关序列,然后进行重试
// 这里存在竞态条件,多线程模式下,可能会被设置为多个线程看见的结果中的任意一个,可能会被设置为一个更小的值,从而小于当前的查询值
gatingSequenceCache.set(gatingSequence);
// 这里看见有足够空间,这里如果尝试竞争空间会产生重复的代码,其实就是外层的代码,因此直接等待下一个循环
}
// 第二步:看见空间足够时尝试CAS竞争空间
else if (cursor.compareAndSet(current, next))
{
// 第三步:成功竞争到了这片空间,返回
// 注意!这里更新了生产者进度,然而生产者并未真正发布数据
// 因此消费者需要调用getHighestPublishedSequence()确认真正的可用空间
break;
}
// 第三步:竞争失败则重试
}
while (true);
return next;
}
@Override
public void publish(final long sequence)
{
// 设置目标插槽上的数据可用,将对应插槽上的标记置为可用标记
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
SingleProducerSequencer和MultiProducerSequencer的区别:
- SingleProducerSequencer内部维护cachedValue(事件消费者序列),nextValue(事件发布者序列)。并且采用padding填充。这个类是线程不安全的
- MultiProducerSequencer每次获取序列都是从Sequence中获取的。Sequence中针对value的操作都是原子的
EventProcessor负责事件处理,有两个实现类:BatchEventProcessor和WorkProcessor
BatchEventProcessor为批量事件处理器,一个单线程的消费者;WorkProcessor为WorkPool消费者里面的事件处理单元,不是消费者,只是一个消费者里面的一个工作单元,多个WorkProcessor协作构成WorkPool消费者
1)BatchEventProcessor event模式单线程处理
public final class BatchEventProcessorimplements EventProcessor { private void processEvents() { T event = null; // 下一个消费的序号,-1到0 // -1是不需要消费的,第一个要消费的是0 long nextSequence = sequence.get() + 1L; // 死循环,因此不会让出线,需要独立的线程(每一个EventProcessor都需要独立的线程) while (true) { try { // 通过sequenceBarrier获取到的最大可用序号 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { // 批量处理事件开始时发送通知 batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } // 批量消费,由于没有其它事件处理器和我竞争序号,这些序号我都是可以消费的 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 更新消费进度(批量消费,每次消费只更新一次Sequence,减少性能消耗) // availableSequence理论上可能小于nextSequence,也就是可能是无效的,因此应该只在成功消费了事件之后更新 sequence.set(availableSequence); } catch (final TimeoutException e) { // 等待sequence超时,进行重试 notifyTimeout(sequence.get()); } catch (final alertException ex) { // 检查到中断/停止请求,如果发现已经不是运行状态,则退出while死循环 if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { // 警告:如果在处理异常时抛出新的异常,会导致跳出while循,导致BatchEventProcessor停止工作,可能导致死锁 // 而系统默认的异常处理会将其包装为RuntimeException exceptionHandler.handleEventException(ex, nextSequence, event); // 成功处理异常后标记当前事件已被处理 // 警告:如果自己实现的等待策略,抛出了TimeoutException、alertException以外的异常,从而走到这里,将导致该sequence被跳过! // 从而导致数据/信号丢失!严重bug! // 严格的说,lmax这里的实现对于扩展并不是特别的安全,安全一点的话,使用两个try块更加安全,一个try块负责获取availableSequence,第二个try块负责事件处理 sequence.set(nextSequence); nextSequence++; } } }
2)WorkProcessor work模式多线程处理
public final class WorkProcessorimplements EventProcessor { private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final Sequence workSequence; @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } // 清除特定状态(可理解为清除线程的中断状态) sequenceBarrier.clearalert(); notifyStart(); // 是否处理了一个事件 在处理完一个事件之后会再次竞争序号进行消费 boolean processedSequence = true; // 看见的已发布序号的缓存,这里是局部变量,在该变量上无竞争 long cachedAvailableSequence = Long.MIN_VALUE; // 下一个要消费的序号(要消费的事件编号),注意起始为-1,注意与BatchEventProcessor的区别 // BatchEventProcessor初始值为sequence.get()+1 // 存为local variable还减少大量的volatile变量读,且保证本次操作过程中的一致性 long nextSequence = sequence.get(); // 要消费的事件对象 T event = null; while (true) { try { // 如果前一个事情被成功处理了 拉取下一个序号,并将上一个序号标记为已成功处理 // 这可以防止当workHandler抛出异常时,sequence跨度太大 if (processedSequence) { processedSequence = false; do { // 获取workProcessor所属的消费者的进度,与workSequence同步(感知其他消费者的进度) nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); // CAS更新workSequence的序号(预分配序号),为什么这样是安全的呢? // 由于消费者的进度由最小的sequence决定,当它CAS更新workSequence之后,它代替了workSequence处在旧的进度上 // 就算多个workProcessor竞争,总有一个是处在正确的进度上的 因此workSequence的更新并不会影响workerPool代表的消费者的消费进度 } // 它只能保证竞争到的序号是可用的,因此只能只消费一个 // 而BatchEventProcessor看见的所有序号都是可用的 if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { // 等待生产者进行生产,这里和BatchEventProcessor不同 // 如果waitFor抛出TimeoutException、Throwable以外的异常,那么cachedAvailableSequence不会被更新, // 也就不会导致nextSequence被标记为已消费! cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final alertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // 同样的警告!如果在处理异常时抛出新的异常,会导致跳出while循环,导致WorkProcessor停止工作,可能导致死锁 // 而系统默认的异常处理会将其包装为RuntimeException!!! exceptionHandler.handleEventException(ex, nextSequence, event); // 成功处理异常后标记当前事件已被消费 processedSequence = true; } } notifyShutdown(); // 写volatile,插入StoreLoad屏障,保证其他线程能看见我退出前的所有操作 running.set(false); }
WorkProcessor中有两个Sequence。sequence代表当前workProcessor的处理进度,即上一次处理的序号,生产者要投递数据时,会去循环workerPool中所有workProcessor的sequence,比较当前sequence最小的值(消费者们的最慢进度值),如果生产者申请序号- 环形缓冲区大小>消费者们的最慢进度值,则不允许入队。workSequence是所有workerPool中的workProcessor共用的,用来预分配(抢占)序号用的,竞争成功表示告诉其他workProcessor去消费下一个序号
public final class WorkerPool{ private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
参考:
剖析Disruptor:为什么会这么快?(一)Ringbuffer的特别之处
40 | 案例分析(三):高性能队列Disruptor
54 | 理解Disruptor(上):带你体会CPU高速缓存的风驰电掣
55 | 理解Disruptor(下):不需要换挡和踩刹车的CPU,有多快?
线程间共享数据无需竞争
Disruptor源码解析 + 实战
Disruptor源码注释版:https://github.com/hl845740757/disruptor-translation



