disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor
baseQueueHelper.java
public abstract class baseQueueHelper, H extends WorkHandler > { private static List queueHelperList = new ArrayList (); private Disruptor disruptor; private RingBuffer ringBuffer; private List initQueue = new ArrayList (); protected abstract int getQueueSize(); protected abstract EventFactory eventFactory(); protected abstract WorkHandler[] getHandler(); public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor = new Disruptor (eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //初始化数据发布 for (D data: initQueue) { ringBuffer.publishEvent(new EventTranslatorOneArg () { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //加入资源清理钩子 synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (baseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } protected abstract WaitStrategy getStrategy(); public synchronized void publishEvent(D data) { if (ringBuffer == null) { initQueue.add(data); return; } ringBuffer.publishEvent(new EventTranslatorOneArg () { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } public void shutdown() { disruptor.shutdown(); } }
EventFactory.java
public class EventFactory implements com.lmax.disruptor.EventFactory{ @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); } }
MyHandlerException.java
public class MyHandlerException implements ExceptionHandler {
private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
ex.printStackTrace();
logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
}
@Override
public void handleonStartException(Throwable ex) {
logger.error("start disruptor error ==[{}]!", ex.getMessage());
}
@Override
public void handleonShutdownException(Throwable ex) {
logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
}
}
SeriesData.java (代表应用A发送给应用B的消息)
public class SeriesData {
private String deviceInfoStr;
public SeriesData() {
}
public SeriesData(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
public String getDeviceInfoStr() {
return deviceInfoStr;
}
public void setDeviceInfoStr(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
@Override
public String toString() {
return "SeriesData{" +
"deviceInfoStr='" + deviceInfoStr + ''' +
'}';
}
}
SeriesDataEvent.java
public class SeriesDataEvent extends ValueWrapper{ }
SeriesDataEventHandler.java
public class SeriesDataEventHandler implements WorkHandler{ private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired private DeviceInfoService deviceInfoService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn("receiver series data is empty!"); } //业务处理 deviceInfoService.processData(event.getValue().getDeviceInfoStr()); } }
SeriesDataEventQueueHelper.java
@Component public class SeriesDataEventQueueHelper extends baseQueueHelperimplements InitializingBean { private static final int QUEUE_SIZE = 1024; @Autowired private List seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this.init(); } }
ValueWrapper.java
public abstract class ValueWrapper{ private T value; public ValueWrapper() {} public ValueWrapper(T value) { this.value = value; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } }
DisruptorConfig.java
@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {
@Bean
public SeriesDataEventHandler smsParamEventHandler1() {
return new SeriesDataEventHandler();
}
@Bean
public SeriesDataEventHandler smsParamEventHandler2() {
return new SeriesDataEventHandler();
}
@Bean
public SeriesDataEventHandler smsParamEventHandler3() {
return new SeriesDataEventHandler();
}
@Bean
public SeriesDataEventHandler smsParamEventHandler4() {
return new SeriesDataEventHandler();
}
@Bean
public SeriesDataEventHandler smsParamEventHandler5() {
return new SeriesDataEventHandler();
}
}
测试
//注入SeriesDataEventQueueHelper消息生产者 @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) public DataResponseVoreceiverDeviceData(@RequestBody String deviceData) { long startTime1 = System.currentTimeMillis(); if (StringUtils.isEmpty(deviceData)) { logger.info("receiver data is empty !"); return new DataResponseVo (400, "failed"); } seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData)); long startTime2 = System.currentTimeMillis(); logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1); return new DataResponseVo (200, "success"); }
应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。



