前言一、disruptor概述二、disruptor原理三、disruptor实现生产消费
1、maven依赖2、entity -> LongEvent.java3、factory -> LongEventFactory.java4.1、consumer -> LongEventHandler4.2、consumer -> LongEventHandler25、producer -> LongEventProducer.java6、Main.java7、一个生产者,多个消费者测试
前言参考并发编程网:https://ifeve.com/disruptor/ 一、disruptor概述
这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使`用事件源驱动方式。业务逻辑处理器的核心是Disruptor。Disruptor它是一个开源的并发框架Disruptor框架最主要的优点:高性能队列,无锁机制,底层使用CAS实现,基于事件驱动源 二、disruptor原理
RingBuffer:见名知意 -> 环形缓冲区假设RingBuffer的总大小为10,当我们需要取12的元素时通过计算12%10=2,可以快速定位到2的元素这个就是为什么disruptor可以实现高性能队列的原因
三、disruptor实现生产消费
1、maven依赖
2、entity -> LongEvent.javacom.lmax disruptor 3.2.1
//声明一个event 生产者与消费者传递数据类型
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
3、factory -> LongEventFactory.java
import com.lmax.disruptor.EventFactory; // EventFactory 实例化LongEvent public class LongEventFactory implements EventFactory4.1、consumer -> LongEventHandler{ public LongEvent newInstance() { return new LongEvent(); } }
import com.lmax.disruptor.EventHandler; //消费者获取生产推送数据 public class LongEventHandler implements EventHandler4.2、consumer -> LongEventHandler2{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者1 获取生产者数据..event:" + event.getValue()); } }
import com.lmax.disruptor.EventHandler; //消费者获取生产推送数据 public class LongEventHandler2 implements EventHandler5、producer -> LongEventProducer.java{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者2 获取生产者数据..event:" + event.getValue()); } }
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;
// 生产者
public class LongEventProducer {
private RingBuffer ringBuffer;
public LongEventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 获取事件队列 下标位置
long sequence = ringBuffer.next();
try {
// 取出空队列(Event)
LongEvent longEvent = ringBuffer.get(sequence);
// 给空队列赋值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println("生产者发送数据...");
// 发送数据
ringBuffer.publish(sequence);
}
}
}
6、Main.java
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 1.创建可以缓存的线程池,提供发给consumer
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建 Event工厂
EventFactory eventFactory = new LongEventFactory();
// 3.创建ringbuffer大小
int ringbuffer = 1024 * 1024;// 2的N次方。
// 4.创建Disruptor
Disruptor disruptor = new Disruptor(eventFactory, ringbuffer, executor,
ProducerType.MULTI, new YieldingWaitStrategy());
// 5.连接消费者---注册消费者
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler2());
// 多个消费者 一个生产者 默认重复消费、配置分组
// 6.启动
disruptor.start();
// 7.创建RingBuffer容器
RingBuffer ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i < 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
executor.shutdown();
disruptor.shutdown();
}
}
7、一个生产者,多个消费者测试
" />



