目录
一、秒杀流程
二、代码示例
1. 依赖及表结构
2. 秒杀下单
3. 本地处理队列
4. 事件对象
5. 消费者
6. 异常处理消费者
7. 事件生产者
8. 创建Disruptor
三、参考资料
一、秒杀流程
二、代码示例
1. 依赖及表结构
com.lmax
disruptor
3.4.4
// 商品库存表
CREATE TABLE `t_goods_stock` (
`id` bigint(4) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`goods_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品ID',
`stock` bigint(255) NULL DEFAULT NULL COMMENT '库存',
`price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
// 订单表
CREATE TABLE `t_order` (
`id` bigint(64) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户ID',
`goods_id` bigint(20) NULL DEFAULT NULL COMMENT '商品ID',
`number` int(11) NULL DEFAULT NULL COMMENT '购买数量',
`price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价',
`total` decimal(8, 2) NULL DEFAULT NULL COMMENT '总价',
`create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '修改人',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 61686 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 秒杀下单
com.lmax disruptor3.4.4
// 商品库存表 CREATE TABLE `t_goods_stock` ( `id` bigint(4) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `goods_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品ID', `stock` bigint(255) NULL DEFAULT NULL COMMENT '库存', `price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; // 订单表 CREATE TABLE `t_order` ( `id` bigint(64) NOT NULL AUTO_INCREMENT COMMENT '主键', `user_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户ID', `goods_id` bigint(20) NULL DEFAULT NULL COMMENT '商品ID', `number` int(11) NULL DEFAULT NULL COMMENT '购买数量', `price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价', `total` decimal(8, 2) NULL DEFAULT NULL COMMENT '总价', `create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人', `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', `update_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '修改人', `update_time` datetime(0) NULL DEFAULT NULL COMMENT '修改时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 61686 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 秒杀下单
a. Contoller层
package com.common.instance.test.controller;
import com.common.instance.test.core.Response;
import com.common.instance.test.service.SeckillService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Random;
@RestController
@RequestMapping("/seckill")
@Api(tags = "秒杀活动")
public class SeckillController {
@Resource
private SeckillService seckillService;
@GetMapping("/isSeckillGoods")
@ApiOperation("用户是否抢到商品")
public Response isSeckillGoods(String goodsId){
// 返回结果
Boolean isSeckill = false;
try {
// 用户是否抢到商品
// 随机用户
String userId = String.valueOf(System.currentTimeMillis());
// 随机商品数量
Random rand = new Random();
Integer num = rand.nextInt(10) + 1;
isSeckill = seckillService.isSeckillGoods(userId, goodsId, num);
} catch (Exception e) {
e.printStackTrace();
return Response.error();
}
return Response.success(isSeckill);
}
}
b. Service层
package com.common.instance.test.service.impl;
import com.common.instance.test.dao.TGoodsStockDao;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.RedisQueue;
import com.common.instance.test.entity.TGoodsStock;
import com.common.instance.test.service.SeckillService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@Service
public class SeckillServiceImpl implements SeckillService {
@Resource
private RedisTemplate redisTemplate;
@Resource
private TGoodsStockDao tGoodsStockDao;
@Override
public Boolean isSeckillGoods(String userId, String goodsId, Integer num) {
// 返回结果
Boolean result = false;
// 商品秒杀数量
Long count = redisTemplate.opsForValue().decrement(RedisQueue.SECKILL_KEY + goodsId, num);
// 查询商品库存,设置值
if (Objects.nonNull(count) && count < 0 ) {
TGoodsStock goodsStock = tGoodsStockDao.selectByPrimaryKey(1L);
if (Objects.nonNull(goodsStock)) {
count = goodsStock.getStock() - num;
redisTemplate.opsForValue().set(RedisQueue.SECKILL_KEY + goodsId, count, 1, TimeUnit.DAYS);
}
}
if (Objects.nonNull(count) && count >= 0) {
// 创建秒杀商品对象事件
SeckillGoodsEvent seckillGoodsEvent = new SeckillGoodsEvent();
seckillGoodsEvent.setUserId(userId);
seckillGoodsEvent.setGoodsId(goodsId);
seckillGoodsEvent.setNum(num);
// 保存到等待队列
redisTemplate.opsForList().leftPush(RedisQueue.WAIT_QUEUE, seckillGoodsEvent);
result = true;
}
return result;
}
}
3. 本地处理队列
a. 定义Redis队列
package com.common.instance.test.disruptor.localQueue;
import org.springframework.stereotype.Component;
public class RedisQueue {
// 商品秒伤数量前缀
public static final String SECKILL_KEY = "SECKILL:GOODS_ID:";
// 商品秒伤数量锁
public static final String SECKILL_LOCK = "SECKILL:LOCK:";
// 等待队列
public static final String WAIT_QUEUE = "SECKILL:QUEUE:WAIT_QUEUE";
// 正在处理队列
public static final String PROCESSING_QUEUE = "SECKILL:QUEUE:PROCESSING_QUEUE";
// 失败队列
public static final String FAILED_QUEUE = "SECKILL:QUEUE:FAILED_QUEUE";
// 镜像队列
public static final String IMAGE_QUEUE = "SECKILL:QUEUE:IMAGE_QUEUE";
}
b. 本地处理队列
package com.common.instance.test.disruptor.localQueue;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.linkedBlockingQueue;
@Component
public class MyEventQueue {
@Resource
private RedisTemplate redisTemplate;
// 本地事件队列
private linkedBlockingQueue myEventQueue;
// 队列最大元素数量
private int capacity = 100000;
@PostConstruct
public void create(){
createEventQueue();
getWaitQueue();
}
private void createEventQueue(){
myEventQueue = new linkedBlockingQueue<>();
}
private void getWaitQueue(){
new Thread(new MyThread()).start();
}
public linkedBlockingQueue getMyEventQueue() {
return myEventQueue;
}
public void setMyEventQueue(linkedBlockingQueue myEventQueue) {
this.myEventQueue = myEventQueue;
}
class MyThread implements Runnable {
@Override
public void run() {
while (true) {
try {
// 判定元素数量是否大于容积
if (myEventQueue.size() < capacity) {
// 等待队列获取秒杀事件
Object o = redisTemplate.opsForList().rightPop(RedisQueue.WAIT_QUEUE);
if (Objects.nonNull(o)) {
String event = JSON.toJSonString(o);
SeckillGoodsEvent seckillGoodsEvent = JSON.parseObject(event, SeckillGoodsEvent.class);
// 添加到本地队列中
myEventQueue.offer(seckillGoodsEvent);
// 添加到正在处理队列中
redisTemplate.opsForList().leftPush(RedisQueue.PROCESSING_QUEUE, seckillGoodsEvent);
// 添加到镜像队列
redisTemplate.opsForList().leftPush(RedisQueue.IMAGE_QUEUE, seckillGoodsEvent);
} else {
Thread.sleep(1 * 1000);
}
} else {
Thread.sleep(1 * 1000);
}
// LogUtil.info("myEventQueue size: " + myEventQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
}
}
}
}
4. 事件对象
package com.common.instance.test.disruptor.event;
import lombok.Data;
@Data
public class SeckillGoodsEvent {
// 用户ID
private String userId;
// 商品ID
private String goodsId;
// 购买商品数量
private Integer num;
// 订单ID
private Long orderId;
}
5. 消费者
a. 创建订单消费者
package com.common.instance.test.disruptor.consumer; import com.common.instance.test.dao.TGoodsStockDao; import com.common.instance.test.dao.TOrderDao; import com.common.instance.test.disruptor.event.SeckillGoodsEvent; import com.common.instance.test.entity.TGoodsStock; import com.common.instance.test.entity.TOrder; import com.lmax.disruptor.EventHandler; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.math.BigDecimal; import java.util.Date; @Component public class CreateOrderHandler implements EventHandler{ @Resource private TOrderDao tOrderDao; @Resource private TGoodsStockDao tGoodsStockDao; @Override public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long sequence, boolean endOfBatch) throws Exception { // 获取商品信息 TGoodsStock tGoodsStock = tGoodsStockDao.selectByPrimaryKey(1L); // 组装订单对象 TOrder order = createOrder(seckillGoodsEvent, tGoodsStock); // 保存订单 tOrderDao.insert(order); // 秒杀活动获取订单ID seckillGoodsEvent.setOrderId(order.getId()); } // 组装订单对象 private TOrder createOrder(SeckillGoodsEvent seckillGoodsEvent, TGoodsStock tGoodsStock){ TOrder order = new TOrder(); order.setUserId(seckillGoodsEvent.getUserId()); order.setGoodsId(Long.valueOf(seckillGoodsEvent.getGoodsId())); order.setNumber(seckillGoodsEvent.getNum()); order.setPrice(tGoodsStock.getPrice()); // 计算总价 BigDecimal total = new BigDecimal("0"); total = total.add(order.getPrice().multiply(new BigDecimal(order.getNumber()))); order.setTotal(total); Date date = new Date(); order.setCreateBy("admin"); order.setCreateTime(date); order.setUpdateBy("admin"); order.setUpdateTime(date); return order; } }
b. 发送订单kafka消息
package com.common.instance.test.disruptor.consumer; import com.alibaba.fastjson.JSONObject; import com.common.instance.test.disruptor.event.SeckillGoodsEvent; import com.lmax.disruptor.EventHandler; import com.log.util.LogUtil; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import javax.annotation.Resource; @Component public class KafkaHandler implements EventHandler{ @Resource private KafkaTemplate kafkaTemplate; public static final String TOPIC_TEST = "SeckillGoodsEvent"; @Override public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long sequence, boolean endOfBatch) throws Exception { send(seckillGoodsEvent); } // 发送kafka消息 public void send(SeckillGoodsEvent seckillGoodsEvent) { String obj2String = JSONObject.toJSonString(seckillGoodsEvent); // 发送消息 ListenableFuture > future = kafkaTemplate.send(TOPIC_TEST, obj2String); future.addCallback(new ListenableFutureCallback >() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 LogUtil.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功的处理 LogUtil.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } }
c. 扣减库存
package com.common.instance.test.disruptor.consumer; import com.common.instance.test.dao.TGoodsStockDao; import com.common.instance.test.dao.TOrderDao; import com.common.instance.test.disruptor.event.SeckillGoodsEvent; import com.common.instance.test.entity.TGoodsStock; import com.common.instance.test.entity.TOrder; import com.lmax.disruptor.EventHandler; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; @Component public class ReduceStockHandler implements EventHandler{ @Resource private TOrderDao tOrderDao; @Resource private TGoodsStockDao tGoodsStockDao; @Override public void onEvent(SeckillGoodsEvent event, long sequence, boolean endOfBatch) throws Exception { // 获取秒杀订单 TOrder order = tOrderDao.selectByPrimaryKey(event.getOrderId()); // 获取商品库存 TGoodsStock goodsStock = tGoodsStockDao.selectByPrimaryKey(1L); if (Objects.nonNull(order) && Objects.nonNull(goodsStock)){ // 计算库存 Long stock = goodsStock.getStock() - order.getNumber(); goodsStock.setStock(stock); // 更新库存 tGoodsStockDao.updateByPrimaryKey(goodsStock); } } }
d. 移除正处理队列元素
package com.common.instance.test.disruptor.consumer; import com.alibaba.fastjson.JSON; import com.common.instance.test.disruptor.event.SeckillGoodsEvent; import com.common.instance.test.disruptor.localQueue.RedisQueue; import com.lmax.disruptor.*; import com.log.util.LogUtil; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; @Component public class RemoveProcessingQueueHandler implements EventHandler{ @Resource private RedisTemplate redisTemplate; @Override public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long l, boolean b) throws Exception { // 事件处理完成后,从正在处理队列中移除 Object event = redisTemplate.opsForList().rightPop(RedisQueue.PROCESSING_QUEUE); LogUtil.info( l + "removeProcessing: " + JSON.toJSonString(event)); } }
6. 异常处理消费者
package com.common.instance.test.disruptor.consumer;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.RedisQueue;
import com.lmax.disruptor.ExceptionHandler;
import com.log.util.LogUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class MyExceptionHandler implements ExceptionHandler {
@Resource
private RedisTemplate redisTemplate;
@Override
public void handleEventException(Throwable ex, long sequence, SeckillGoodsEvent seckillGoodsEvent) {
try {
// 移除正在处理队列事件
redisTemplate.opsForList().rightPop(RedisQueue.PROCESSING_QUEUE);
// 添加到失败队列事件
redisTemplate.opsForList().leftPush(RedisQueue.FAILED_QUEUE, seckillGoodsEvent);
// 记录处理异常
LogUtil.error("MyExceptionHandler.handleEventException()", (Exception) ex);
} catch (Exception e) {
LogUtil.error("MyExceptionHandler.handleEventException()",
"PROCESSING_QUEUE into FAILED_QUEUE failed: " + JSON.toJSonString(seckillGoodsEvent),
e);
}
}
@Override
public void handleonStartException(Throwable ex) {
LogUtil.error("MyExceptionHandler.handleonStartException()", (Exception) ex);
}
@Override
public void handleonShutdownException(Throwable ex) {
LogUtil.error("MyExceptionHandler.handleonShutdownException()", (Exception) ex);
}
}
7. 事件生产者
package com.common.instance.test.disruptor.publisher;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.MyEventQueue;
import com.lmax.disruptor.EventTranslator;
import com.log.util.LogUtil;
import java.util.Objects;
public class MyEventTranslator implements EventTranslator {
// 本地事件队列
private MyEventQueue myEventQueue;
public MyEventTranslator(MyEventQueue myEventQueue){
this.myEventQueue = myEventQueue;
}
@Override
public void translateTo(SeckillGoodsEvent seckillGoodsEvent, long l) {
// 从本地队列获取事件
SeckillGoodsEvent event = myEventQueue.getMyEventQueue().poll();
// 事件属性值赋值
if (Objects.nonNull(event)){
seckillGoodsEvent.setUserId(event.getUserId());
seckillGoodsEvent.setGoodsId(event.getGoodsId());
seckillGoodsEvent.setNum(event.getNum());
}
LogUtil.info(l + " seckillGoodsEvent: " + JSON.toJSonString(seckillGoodsEvent));
}
}
8. 创建Disruptor
package com.common.instance.test.disruptor;
import com.common.instance.test.disruptor.consumer.*;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.MyEventQueue;
import com.common.instance.test.disruptor.publisher.MyEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.log.util.LogUtil;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
@Component
public class SeckillDisruptor {
// 消费者:所有Handler处理异常记录
@Resource
private MyExceptionHandler myExceptionHandler;
// 消费者:创建订单
@Resource
private CreateOrderHandler createOrderHandler;
// 消费者:发送kafka
@Resource
private KafkaHandler kafkaHandler;
// 消费者:扣减库存
@Resource
private ReduceStockHandler reduceStockHandler;
// 消费者:移除正在处理的秒杀事件
@Resource
private RemoveProcessingQueueHandler removeProcessingQueueHandler;
@Resource
private MyEventQueue myEventQueue;
private Disruptor disruptor;
@PostConstruct
public void init(){
LogUtil.info("SeckillDisruptor init");
// 创建Disruptor
disruptor = createDisruptor();
// 启动
new Thread(new StartDisruptorThread()).start();
}
// 创建Disruptor
public Disruptor createDisruptor(){
// 创建Disruptor
Disruptor disruptor = new Disruptor(
// 预填充RingBuffer
new EventFactory(){
@Override
public Object newInstance() {
return new SeckillGoodsEvent();
}
},
// RingBuffer的大小(必须为2的N次方)
4096,
// 线程池处理
Executors.newWorkStealingPool(256),
// 生成类型:SINGLE(单生产者)、MULTI(多生产者)
ProducerType.SINGLE,
// 等待策略
new BlockingWaitStrategy());
// 消费异常处理
disruptor.setDefaultExceptionHandler(myExceptionHandler);
// 消费者
EventHandlerGroup handlerGroup = disruptor.handleEventsWith(createOrderHandler);
handlerGroup.then(kafkaHandler, reduceStockHandler);
handlerGroup.then(removeProcessingQueueHandler);
return disruptor;
}
class StartDisruptorThread implements Runnable {
@Override
public void run() {
disruptor.start();
while (true) {
try {
// 本地队列没有秒杀事件,休眠50ms
if (myEventQueue.getMyEventQueue().isEmpty()) {
// LogUtil.info("本地事件队列没有可处理秒杀事件" + Thread.currentThread().getName());
Thread.sleep(1 * 50);
continue;
}
// 生产者
MyEventTranslator myEventTranslator = new MyEventTranslator(myEventQueue);
disruptor.publishEvent(myEventTranslator);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
}
}
三、参考资料
package com.common.instance.test.disruptor.publisher; import com.alibaba.fastjson.JSON; import com.common.instance.test.disruptor.event.SeckillGoodsEvent; import com.common.instance.test.disruptor.localQueue.MyEventQueue; import com.lmax.disruptor.EventTranslator; import com.log.util.LogUtil; import java.util.Objects; public class MyEventTranslator implements EventTranslator{ // 本地事件队列 private MyEventQueue myEventQueue; public MyEventTranslator(MyEventQueue myEventQueue){ this.myEventQueue = myEventQueue; } @Override public void translateTo(SeckillGoodsEvent seckillGoodsEvent, long l) { // 从本地队列获取事件 SeckillGoodsEvent event = myEventQueue.getMyEventQueue().poll(); // 事件属性值赋值 if (Objects.nonNull(event)){ seckillGoodsEvent.setUserId(event.getUserId()); seckillGoodsEvent.setGoodsId(event.getGoodsId()); seckillGoodsEvent.setNum(event.getNum()); } LogUtil.info(l + " seckillGoodsEvent: " + JSON.toJSonString(seckillGoodsEvent)); } }
8. 创建Disruptor
package com.common.instance.test.disruptor;
import com.common.instance.test.disruptor.consumer.*;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.MyEventQueue;
import com.common.instance.test.disruptor.publisher.MyEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.log.util.LogUtil;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
@Component
public class SeckillDisruptor {
// 消费者:所有Handler处理异常记录
@Resource
private MyExceptionHandler myExceptionHandler;
// 消费者:创建订单
@Resource
private CreateOrderHandler createOrderHandler;
// 消费者:发送kafka
@Resource
private KafkaHandler kafkaHandler;
// 消费者:扣减库存
@Resource
private ReduceStockHandler reduceStockHandler;
// 消费者:移除正在处理的秒杀事件
@Resource
private RemoveProcessingQueueHandler removeProcessingQueueHandler;
@Resource
private MyEventQueue myEventQueue;
private Disruptor disruptor;
@PostConstruct
public void init(){
LogUtil.info("SeckillDisruptor init");
// 创建Disruptor
disruptor = createDisruptor();
// 启动
new Thread(new StartDisruptorThread()).start();
}
// 创建Disruptor
public Disruptor createDisruptor(){
// 创建Disruptor
Disruptor disruptor = new Disruptor(
// 预填充RingBuffer
new EventFactory(){
@Override
public Object newInstance() {
return new SeckillGoodsEvent();
}
},
// RingBuffer的大小(必须为2的N次方)
4096,
// 线程池处理
Executors.newWorkStealingPool(256),
// 生成类型:SINGLE(单生产者)、MULTI(多生产者)
ProducerType.SINGLE,
// 等待策略
new BlockingWaitStrategy());
// 消费异常处理
disruptor.setDefaultExceptionHandler(myExceptionHandler);
// 消费者
EventHandlerGroup handlerGroup = disruptor.handleEventsWith(createOrderHandler);
handlerGroup.then(kafkaHandler, reduceStockHandler);
handlerGroup.then(removeProcessingQueueHandler);
return disruptor;
}
class StartDisruptorThread implements Runnable {
@Override
public void run() {
disruptor.start();
while (true) {
try {
// 本地队列没有秒杀事件,休眠50ms
if (myEventQueue.getMyEventQueue().isEmpty()) {
// LogUtil.info("本地事件队列没有可处理秒杀事件" + Thread.currentThread().getName());
Thread.sleep(1 * 50);
continue;
}
// 生产者
MyEventTranslator myEventTranslator = new MyEventTranslator(myEventQueue);
disruptor.publishEvent(myEventTranslator);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
}
}
三、参考资料
Disruptor 系列(二)使用场景 - 走看看
disruptor模拟高速处理大规模订单类业务场景_congge-CSDN博客
SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)
使用Disruptor完成多个消费者不重复消费消息_tianyaleixiaowu的专栏-CSDN博客_disruptor 多消费者



