栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

七、Disruptor框架

七、Disruptor框架

.

前言一、disruptor概述二、disruptor原理三、disruptor实现生产消费

1、maven依赖2、entity -> LongEvent.java3、factory -> LongEventFactory.java4.1、consumer -> LongEventHandler4.2、consumer -> LongEventHandler25、producer -> LongEventProducer.java6、Main.java7、一个生产者,多个消费者测试

前言

参考并发编程网:https://ifeve.com/disruptor/ 一、disruptor概述

这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使`用事件源驱动方式。业务逻辑处理器的核心是Disruptor。Disruptor它是一个开源的并发框架Disruptor框架最主要的优点:高性能队列,无锁机制,底层使用CAS实现,基于事件驱动源 二、disruptor原理

RingBuffer:见名知意 -> 环形缓冲区假设RingBuffer的总大小为10,当我们需要取12的元素时通过计算12%10=2,可以快速定位到2的元素这个就是为什么disruptor可以实现高性能队列的原因
三、disruptor实现生产消费 1、maven依赖

	
		
			com.lmax
			disruptor
			3.2.1
		
	
2、entity -> LongEvent.java
//声明一个event 生产者与消费者传递数据类型
public class LongEvent {

	private Long value;

	public Long getValue() {
		return value;
	}

	public void setValue(Long value) {
		this.value = value;
	}
}
3、factory -> LongEventFactory.java
import com.lmax.disruptor.EventFactory;

// EventFactory 实例化LongEvent
public class LongEventFactory implements EventFactory {

	public LongEvent newInstance() {

		return new LongEvent();
	}
}
4.1、consumer -> LongEventHandler
import com.lmax.disruptor.EventHandler;

//消费者获取生产推送数据
public class LongEventHandler implements EventHandler {

	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
		System.out.println("消费者1 获取生产者数据..event:" + event.getValue());
	}
}
4.2、consumer -> LongEventHandler2
import com.lmax.disruptor.EventHandler;

//消费者获取生产推送数据
public class LongEventHandler2 implements EventHandler {

	public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
		System.out.println("消费者2 获取生产者数据..event:" + event.getValue());
	}
}
5、producer -> LongEventProducer.java
import java.nio.ByteBuffer;
import com.lmax.disruptor.RingBuffer;

// 生产者
public class LongEventProducer {
	private RingBuffer ringBuffer;

	public LongEventProducer(RingBuffer ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	public void onData(ByteBuffer byteBuffer) {
		// 获取事件队列 下标位置
		long sequence = ringBuffer.next();
		try {
			// 取出空队列(Event)
			LongEvent longEvent = ringBuffer.get(sequence);
			// 给空队列赋值
			longEvent.setValue(byteBuffer.getLong(0));
		} catch (Exception e) {
			// TODO: handle exception
		} finally {
			System.out.println("生产者发送数据...");
			// 发送数据
			ringBuffer.publish(sequence);
		}
	}
}
6、Main.java
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {

	public static void main(String[] args) {
		// 1.创建可以缓存的线程池,提供发给consumer
		ExecutorService executor = Executors.newCachedThreadPool();
		// 2.创建 Event工厂
		EventFactory eventFactory = new LongEventFactory();
		// 3.创建ringbuffer大小
		int ringbuffer = 1024 * 1024;// 2的N次方。
		// 4.创建Disruptor
		Disruptor disruptor = new Disruptor(eventFactory, ringbuffer, executor,
				ProducerType.MULTI, new YieldingWaitStrategy());
		// 5.连接消费者---注册消费者
		disruptor.handleEventsWith(new LongEventHandler());
		disruptor.handleEventsWith(new LongEventHandler2());
		// 多个消费者 一个生产者 默认重复消费、配置分组
		// 6.启动
		disruptor.start();
		// 7.创建RingBuffer容器
		RingBuffer ringBuffer = disruptor.getRingBuffer();
		// 8.创建生产者
		LongEventProducer producer = new LongEventProducer(ringBuffer);
		// 9.指定缓冲区大小
		ByteBuffer byteBuffer = ByteBuffer.allocate(8);
		for (int i = 1; i < 100; i++) {
			byteBuffer.putLong(0, i);
			producer.onData(byteBuffer);
		}
		executor.shutdown();
		disruptor.shutdown();
	}
}
7、一个生产者,多个消费者测试

" />

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762360.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号