栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

先进先出的高性能的有界内存队列Disruptor简介

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

先进先出的高性能的有界内存队列Disruptor简介

1.Disruptor简介      

        Disruptor是一个高性能的有界内存队列,它在 Apache Storm、Camel、Log4j 2 等很多知名项目中都有广泛应用。之所以如此受青睐,主要还是因为它的性能表现非常优秀。它比 Java 中另外一个非常常用的内存消息队列 ArrayBlockingQueue(ABS)的性能,要高一个数量级,可以算得上是最快的内存消息队列了。它还因此获得过 Oracle 官方的 Duke 大奖。

Disruptor 是如何做到如此高性能的?

使用 RingBuffer 数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率;对象循环利用,避免频繁 GC。此外根据Index进行环形定位并非简单取模,而是使用位运算,效率更高,定位更快。前后56字节(7个long)缓存行填充的手法,使得每个变量独占一个缓存行,避免伪共享,提升CPU缓存利用率。采用CAS无锁算法,避免频繁加锁、解锁的性能消耗。

伪共享

由于共享缓存行导致缓存无效的场景。

伪共享和 CPU 内部的 Cache 有关,Cache 内部是按照缓存行(Cache Line)管理的,缓存行的大小通常是 64 个字节。CPU 的缓存就利用了程序的局部性原理:时间局部性(指的是程序中的某条指令一旦被执行,不久之后这条指令很可能再次被执行;如果某条数据被访问,不久之后这条数据很可能再次被访问。)、空间局部性(指某块内存一旦被访问,不久之后这块内存附近的内存也很可能被访问)。为了更好地利用缓存,我们必须避免伪共享,解决手法为:前后56字节(7个long)缓存行填充。

CAS

比较并交换(CompareAndSwap),本质上是无锁,不过也称之为自旋锁或者自旋。

2.使用简单示例

生产者生产的对象(也就是消费者消费的对象)称为 Event,使用 Disruptor 必须自定义 Event。构建 Disruptor 对象除了要指定队列大小外,还需要传入一个 EventFactory。消费 Disruptor 中的 Event 需要通过 handleEventsWith() 方法注册一个事件处理器,发布 Event 则需要通过 publishEvent() 或publish()方法。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class SimpleTest {
    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();
        LongEventFactory longEventFactory = new LongEventFactory();
        int bufferSize = 256;
        Disruptor disruptor = new Disruptor(longEventFactory, bufferSize, executor, ProducerType.SINGLE, new BlockingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer ringBuffer = disruptor.getRingBuffer();
        for (int x = 0; x < 256; x++) {
            long sequence = ringBuffer.next();
            try {
                LongEvent event = ringBuffer.get(sequence);
                event.setValue(x);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }

    static class LongEvent {
        private long value;

        public long getValue() {
            return value;
        }

        public void setValue(long value) {
            this.value = value;
        }
    }

    static class LongEventFactory implements EventFactory {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }

    static class LongEventHandler implements EventHandler {
        @Override
        public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("Event:" + longEvent.getValue() + ", sequence: " + sequence);
        }
    }
}
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executors;


public class MultiTest {
    public static void main(String[] args) {
        Disruptor disruptor = new Disruptor(
                new LongEventFactory(),
                getRingBufferSize(500),
                Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()),
                ProducerType.MULTI, // ProducerType.SINGLE  单生产者; ProducerType.MULTI   多生产者
                // BlockingWaitStrategy:最低效的策略,但对CPU的消耗最小;
                // SleepingWaitStrategy:与BlockingWaitStrategy类似,合用于异步日志类似的场景;
                // YieldingWaitStrategy 性能最好,要求事件处理线数小于 CPU 逻辑核心数
                new YieldingWaitStrategy());
        disruptor.handleEventsWithWorkerPool(new LongEventHandler[]{new LongEventHandler("c1"), new LongEventHandler("c2")});
        RingBuffer ringBuffer = disruptor.start();

        //生产者1
        new Thread(() -> {
            publish(ringBuffer, 0, 10);
        }).start();

        // 生产者2
        new Thread(() -> {
            publish(ringBuffer, 10, 20);
        }).start();
    }

    private static void publish(RingBuffer ringBuffer, int start, int end) {
        for (int x = start; x < end; x++) {
            long sequence = ringBuffer.next();
            try {
                LongEvent event = ringBuffer.get(sequence);
                event.setValue(x);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }

    private static int getRingBufferSize(int num) {
        int s = 2;
        while (s < num) {
            s <<= 1;
        }
        return s;
    }

    static class LongEvent {
        private long value;

        public long getValue() {
            return value;
        }

        public void setValue(long value) {
            this.value = value;
        }
    }

    static class LongEventFactory implements EventFactory {
        @Override
        public LongEvent newInstance() {
            return new LongEvent();
        }
    }

    static class LongEventHandler implements EventHandler, WorkHandler {
        private String consumerId;
        public LongEventHandler(String consumerId){
            this.consumerId = consumerId;
        }

        @Override
        public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
            System.out.println("Event:" + longEvent.getValue() + ", sequence: " + sequence);
        }

        @Override
        public void onEvent(LongEvent longEvent) throws Exception {
            System.out.println(this.consumerId + " consume:" + longEvent.getValue());
        }
    }
}
3.TODO

        用 Disruptor 替代 java jdk 提供的有界内存队列好处显而易见,之前使用java现实了一个基于Actor模型的RPC,在Actor模型中的收件箱、发件箱之前采用linkedBlockingQueue,后续计划使用Disruptor进行替代。关于ActorRpc详见:

用JAVA实现基于Actor模型的RPC

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/769789.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号