一、Java锁机制
1.自旋转锁与互斥锁区别2.公平与非公平锁 二、Disruptor
1.disruptor原理2.创建disruptor消费端与生产者
一、Java锁机制 1.自旋转锁与互斥锁区别
2.公平与非公平锁互斥锁:线程会从sleep(加锁)➡running(解锁),过程中有上下文的切换,CPU的抢占,信号的发送等开销。
自旋锁:线程一直是running(加锁➡解锁),死循环检测锁的标志位,机制不复杂。
Pom Maven依赖信息
com.lmax disruptor 3.2.1
package com.itmayiedu.entity;
//声明一个event 生产者与消费者传递数据类型
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
package com.itmayiedu.factory; import com.itmayiedu.entity.LongEvent; import com.lmax.disruptor.EventFactory; // EventFactory 实例化LongEvent public class LongEventFactory implements EventFactory{ public LongEvent newInstance() { return new LongEvent(); } }
package com.itmayiedu.consumer; import com.itmayiedu.entity.LongEvent; import com.lmax.disruptor.EventHandler; //消费者获取生产推送数据 public class LongEventHandler implements EventHandler{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者1 获取生产者数据..event:" + event.getValue()); } }
package com.itmayiedu.consumer; import com.itmayiedu.entity.LongEvent; import com.lmax.disruptor.EventHandler; //消费者获取生产推送数据 public class LongEventHandler2 implements EventHandler{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("消费者2 获取生产者数据..event:" + event.getValue()); } }
package com.itmayiedu.producer;
import java.nio.ByteBuffer;
import com.itmayiedu.entity.LongEvent;
import com.lmax.disruptor.RingBuffer;
// 生产者
public class LongEventProducer {
private RingBuffer ringBuffer;
public LongEventProducer(RingBuffer ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 获取事件队列 下标位置
long sequence = ringBuffer.next();
try {
// 取出空队列(Event)
LongEvent longEvent = ringBuffer.get(sequence);
// 给空队列赋值
longEvent.setValue(byteBuffer.getLong(0));
} catch (Exception e) {
// TODO: handle exception
} finally {
System.out.println("生产者发送数据...");
// 发送数据
ringBuffer.publish(sequence);
}
}
}
package com.itmayiedu.producer;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.itmayiedu.consumer.LongEventHandler;
import com.itmayiedu.consumer.LongEventHandler2;
import com.itmayiedu.entity.LongEvent;
import com.itmayiedu.factory.LongEventFactory;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) {
// 1.创建可以缓存的线程池,提供发给consumer
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建 Event工厂
EventFactory eventFactory = new LongEventFactory();
// 3.创建ringbuffer大小
int ringbuffer = 1024 * 1024;// 2的N次方。
// 4.创建Disruptor
Disruptor disruptor = new Disruptor(eventFactory, ringbuffer, executor,
ProducerType.MULTI, new YieldingWaitStrategy());
// 5.连接消费者---注册消费者
disruptor.handleEventsWith(new LongEventHandler());
disruptor.handleEventsWith(new LongEventHandler2());
// 多个消费者 一个生产者 默认重复消费、配置分组
// 6.启动
disruptor.start();
// 7.创建RingBuffer容器
RingBuffer ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i < 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
executor.shutdown();
disruptor.shutdown();
}
}



