栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Disruptor深入理解Ringbuffer

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Disruptor深入理解Ringbuffer

        上一篇从log4j2引出了Disruptor,由Log4j2日志框架到Disruptor入门 简单介绍了他的特性,这一篇咱们深入探讨下它的底层原理Ringbuffer。

        在剖析RingBuffer之前,有必要简单介绍几个概念:缓存行、伪共享、内存屏障,上一章讨论了锁对程序运行的影响以及Log框架是如果解决锁带来影响,最终引出Disruptor,因为它是一个开源的无锁并发框架,但即便规避了使用锁,在计算底层还是会出现共享资源访问问题,接下来解释下刚才提出的几个概念。

        缓存行:  CPU加载内存中的数据时并不是指谁就获取谁,多级缓存结构中,CPU会将要处理的数据的相邻数据一并读取到L1级缓存中,他们就是一个缓存行,老机器32k, 目前主流64k,也有128k的,Disputor是对64k缓存CPU做了优化

        

        伪共享:当两个CPU核心读取不同的数据,又恰好在同一个缓存行中,那么就会出现两个CPU加载了同一份数据到L1中,那么当两个CPU分别对数据进行修改时,都会通知另一个CPU将它的缓存失效并重新读取,这样互相失效重新读取就是“伪共享”

 

        内存屏障:它是一个CPU指令,正常情况下CPU会在不影响执行结果的情况下对指令重新排序以获得更优的性能,但是当在一段指令中增加了这个内存屏障,那么CPU会保证在该屏障指令之前的指令先执行,并且还有一个作用,之前的指令执行的结果刷新到内存,并通知其他CPU重新获取内存中最新数据

        例如:java中的volatile就是一个内存屏障,当volatile加在变量上,那么在修改和读取改变量之前会分别插入一个写屏障和读屏障,从而保证变量的可见性

        在介绍了缓存行、伪共享、内存屏障这几个概念后,再回到我们今天的主题RingBuffer,首先RingBuffer是环形数组(其实物理上不存在环形一说,只不过是一段固定长度的数组),我可以认为他是一个队列,一边读一边写,类似咱们的ArrayBlockingQueue,那他如何保证高效呢

        1、内存连续不回收,RingBuffer在初始化的时候就已经将数组全部填充完毕,因此他们是内存连续的且一直存在内存中,生产者只是会重新设置数组中对象的值,改变生产指针,而消费者只会根据指针读取数组中对应下标的值,这样就避免了垃圾回收

RingBufferFields(
        EventFactory eventFactory,
        Sequencer sequencer)
    {
        this.sequencer = sequencer;
        this.bufferSize = sequencer.getBufferSize();

        if (bufferSize < 1)
        {
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1)
        {
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }

        this.indexMask = bufferSize - 1;
        this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }

// 将所有的环上的所有entry都采用eventFactory生成一个实例填充,因此该"环形"数组内存是连续的
private void fill(EventFactory eventFactory)
    {
        for (int i = 0; i < bufferSize; i++)
        {
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }

        2、因为内存连续,当需要生产消息和消费消息时,会接着处理下一块内存块,按上面所说的缓存行,相邻的数据会一次性加载到内存,因为避免了再次从内存获取,巧妙的利用了程序的局部性原理

        3、不管是生产者和消费者,他们都会维护一个指向内存块的指针,而这个指针又采用了缓存行填充,以至于生产和消费的同时进行且无需加锁,而他们的指针不会存在被加载到同一缓存行中,因此也就避免了伪共享问题

class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}


public class Sequence extends RhsPadding
{
    
}

        4、

        receiver: 接受消息并写入到31位置,会一直往32后面的数字递增写入,直到与14碰撞停止写

//生产者获取n个写入位置
do {
  //cursor类似于入队索引,指的是上次生产到这里
  current = cursor.get();
  //目标是在生产n个
  next = current + n;
  //减掉一个循环
  long wrapPoint = next - bufferSize;
  //获取上一次的最小消费位置
  long cachedGatingSequence = gatingSequenceCache.get();
  //没有足够的空余位置
  if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
    //重新计算所有消费者里面的最小值位置
    long gatingSequence = Util.getMinimumSequence(
        gatingSequences, current);
    //仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
    if (wrapPoint > gatingSequence){
      LockSupport.parkNanos(1);
      continue;
    }
    //从新设置上一次的最小消费位置
    gatingSequenceCache.set(gatingSequence);
  } else if (cursor.compareAndSet(current, next)){
    //获取写入位置成功,跳出循环
    break;
  }
} while (true);

其他几个是消费者,没消费一个就将指针往下移动,知道消费到生产者的位置停下

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759358.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号