栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Disruptor极速体验

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Disruptor极速体验

一、简单了解下Disruptor

• Disruptor是一个在获得尽量高的吞吐量(TPS)和尽量低的延迟的前提下设计而出的一种“生产者-消费者”模型。
• Disruptor核心为RingBuffer,称之为环形缓冲区。
• Disruptor在多线程场景下“不用锁”。

二、极速体验

首先创建Maven项目,引入工程

        
            com.lmax
            disruptor
            3.4.4
        

(一)单生产者和单消费者模式

  1. 创建事件对象(数据体)
public class LongEvent {
	private Long value;
	public Long getValue() {
		return value;
	}
	public void setValue(Long value) {
		this.value = value;
	}
}
  1. 定义事件工厂(生产数据体的工厂)
public class LongEventFactory implements EventFactory {
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}
  1. 创建消费者
public class LongEventConsumer implements EventHandler {
    @Override
    public void onEvent(LongEvent longEvent, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("消费者:" + longEvent.getValue() + "  sequence:" + sequence + " endOfBatch:" + endOfBatch);
        // 业务中的数据处理应该写在这个地方...
    }
}
  1. 创建生产者
public class LongEventProducer {
    // 核心组件RingBuffer
    private final RingBuffer ringBuffer;

    public LongEventProducer(RingBuffer ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    // 生产数据的方法,往环形数组中放数据
    public void onData(Integer val) {
        //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
        long sequence = ringBuffer.next();
        try {
            //用上面的索引取出一个空的事件用于填充
            LongEvent l = ringBuffer.get(sequence);
            l.setValue(Long.valueOf(val));
            System.out.println("生产者生产了数据:" + val);
        } finally {
            //注意,最后的ringbuffer.publish方法必须包含在finally中以确保必须得到调用;
            ringBuffer.publish(sequence);
        }
    }
}
  1. 创建测试类测试
public class TestSingle {
    public static void main(String[] args) {
        // 创建一个线程工厂提供线程来触发消费者的事件处理
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 创建生成事件的工厂
        LongEventFactory factory = new LongEventFactory();
        // 定义缓冲区大小,一定要是2的N次方
        int bufferSize = 1024;
        // 创建Disruptor
        Disruptor disruptor = new Disruptor<>(
                factory,//事件(数据)工厂
                bufferSize, // 环形数组的大小
                threadFactory, // 线程工厂
                ProducerType.SINGLE, // 单个生产者
                new YieldingWaitStrategy() // 等待策略
        );
        // 连接消费端方法
        disruptor.handleEventsWith(new LongEventConsumer());
        // 启动
        disruptor.start();
        // 创建Ringbuffer容器
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        // 创建生产者
        LongEventProducer producer = new LongEventProducer(ringBuffer);
        // 生产数据
        for (int i = 0; i < 100; i++) {
            producer.onData(i);
        }
        // 关闭
        disruptor.shutdown();
    }
}
  1. 测试结果

    (二)多生产者和多消费者模式
  2. 创建事件对象
public class OrderEvent {
    private String id;//ID
    private String name;
    private double price;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}
  1. 创建事件工厂
public class OrderEventFactory implements EventFactory {
    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}
  1. 创建消费者(此处与单消费者有差异,需要实现的是WorkHandler)
public class OrderEventConsumer implements WorkHandler {
    private String consumerId;
    private AtomicInteger count = new AtomicInteger(0); // 记录下当前消费者消费的次数

    public OrderEventConsumer(String consumerId) {
        this.consumerId = consumerId;
    }

    @Override
    public void onEvent(OrderEvent order) {
        System.out.println("消费者:" + consumerId + ", 消费了消息:" + order.getId());
        count.incrementAndGet(); // 消费次数加1
    }

    public int getCount() {
        return count.get();
    }
}
  1. 创建生产者
public class OrderEventProducer {
    private String producerId;
    private final RingBuffer ringBuffer;

    public String getConsumerId() {
        return consumerId;
    }

    public OrderEventProducer(String producerId, RingBuffer ringBuffer) {
        this.producerId = producerId;
        this.ringBuffer = ringBuffer;
    }

    
    public void onData(String data) {
        //可以把ringbuffer看做一个事件队列,那么next就是得到下面一个事件槽
        long sequence = ringBuffer.next();
        try {
            //用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
            OrderEvent order = ringBuffer.get(sequence);
            order.setId(data);
            System.out.println("生产者:" + producerId + " ,生产了数据:" + order.getId());
        } finally {
            //注意,最后的ringbuffer.publish方法必须包含在finally中以确保必须得到调用;
            ringBuffer.publish(sequence);
        }
    }
}
  1. 创建测试类测试
public class TestMulti {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个线程工厂提供线程来触发消费者的事件处理
        ThreadFactory producerFactory = Executors.defaultThreadFactory();
        // 创建生成事件的工厂
        OrderEventFactory eventFactory = new OrderEventFactory();
        // 创建bufferSize ,也就是RingBuffer大小,必须是2的N次方
        int ringBufferSize = 1024 * 1024;
        Disruptor disruptor = new Disruptor<>(
                eventFactory, // 事件工厂
                ringBufferSize, // 环形数组大小
                producerFactory, // 线程工厂
                ProducerType.MULTI, // 多生产者模式
                new YieldingWaitStrategy() // 等待策略
        );
        RingBuffer ringBuffer = disruptor.getRingBuffer();
        // 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
        OrderEventConsumer[] consumers = new OrderEventConsumer[10];
        for (int i = 0; i < consumers.length; i++) {
            consumers[i] = new OrderEventConsumer("C" + i);
        }
        // 连接到消费者
        disruptor.handleEventsWithWorkerPool(consumers);
        // 启动
        disruptor.start();
        // 此处创建一个计数器,保证多生产者同时开始生产数据
        final CountDownLatch latch = new CountDownLatch(1);
        // 此处创建100个生产者,每个生产者生产100条数据
        for (int i = 0; i < 100; i++) {
            OrderEventProducer orderEventProducer = new OrderEventProducer("P" + i, ringBuffer);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    for (int j = 0; j < 100; j++) {
                        orderEventProducer.onData(UUID.randomUUID().toString() + "-----" + j);
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
        System.out.println("----------开始生产--------------");
        latch.countDown();//同时进行生产
        Thread.sleep(10000);
        // 统计下消费次数
        int count = 0;
        for (OrderEventConsumer consumer : consumers) {
            System.err.println("消费者:" + consumer.getConsumerId() + " ,消费了:" + consumer.getCount() + "次");
            count += consumer.getCount();
        }
        System.err.println("count = " + count);
        disruptor.shutdown();
    }
  1. 测试结果
三、参考文章

Disruptor 介绍

写在后面:本文为个人学习过程,仅限于学习使用。学习使我快乐~
希望文章可以帮到大家,路过大神不喜勿喷,我们共同学习,加油吧!!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/462835.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号