栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spring与disruptor集成的简单示例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spring与disruptor集成的简单示例

disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor

baseQueueHelper.java


public abstract class baseQueueHelper, H extends WorkHandler> {

  
  private static List queueHelperList = new ArrayList();
  
  private Disruptor disruptor;
  
  private RingBuffer ringBuffer;
  
  private List initQueue = new ArrayList();

  
  protected abstract int getQueueSize();

  
  protected abstract EventFactory eventFactory();

  
  protected abstract WorkHandler[] getHandler();

  
  public void init() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
    disruptor = new Disruptor(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
    disruptor.setDefaultExceptionHandler(new MyHandlerException());
    disruptor.handleEventsWithWorkerPool(getHandler());
    ringBuffer = disruptor.start();

    //初始化数据发布
    for (D data: initQueue) {
      ringBuffer.publishEvent(new EventTranslatorOneArg() {
 @Override
 public void translateTo(E event, long sequence, D data) {
   event.setValue(data);
 }
      }, data);
    }

    //加入资源清理钩子
    synchronized (queueHelperList) {
      if (queueHelperList.isEmpty()) {
 Runtime.getRuntime().addShutdownHook(new Thread() {
   @Override
   public void run() {
     for (baseQueueHelper baseQueueHelper : queueHelperList) {
baseQueueHelper.shutdown();
     }
   }
 });
      }
      queueHelperList.add(this);
    }
  }

  
  protected abstract WaitStrategy getStrategy();

  
  public synchronized void publishEvent(D data) {
    if (ringBuffer == null) {
      initQueue.add(data);
      return;
    }
    ringBuffer.publishEvent(new EventTranslatorOneArg() {
      @Override
      public void translateTo(E event, long sequence, D data) {
 event.setValue(data);
      }
    }, data);
  }

  
  public void shutdown() {
    disruptor.shutdown();
  }
}

EventFactory.java


public class EventFactory implements com.lmax.disruptor.EventFactory {

  @Override
  public SeriesDataEvent newInstance() {
    return new SeriesDataEvent();
  }
}

MyHandlerException.java

public class MyHandlerException implements ExceptionHandler {

  private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);

  
  @Override
  public void handleEventException(Throwable ex, long sequence, Object event) {
    ex.printStackTrace();
    logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
  }

  
  @Override
  public void handleonStartException(Throwable ex) {
    logger.error("start disruptor error ==[{}]!", ex.getMessage());
  }

  
  @Override
  public void handleonShutdownException(Throwable ex) {
    logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
  }
}

SeriesData.java (代表应用A发送给应用B的消息)

public class SeriesData {
  private String deviceInfoStr;
  public SeriesData() {
  }

  public SeriesData(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  public String getDeviceInfoStr() {
    return deviceInfoStr;
  }

  public void setDeviceInfoStr(String deviceInfoStr) {
    this.deviceInfoStr = deviceInfoStr;
  }

  @Override
  public String toString() {
    return "SeriesData{" +
 "deviceInfoStr='" + deviceInfoStr + ''' +
 '}';
  }
}

SeriesDataEvent.java

public class SeriesDataEvent extends ValueWrapper {
}

SeriesDataEventHandler.java

public class SeriesDataEventHandler implements WorkHandler {
  private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
  @Autowired
  private DeviceInfoService deviceInfoService;

  @Override
  public void onEvent(SeriesDataEvent event) {
    if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
      logger.warn("receiver series data is empty!");
    }
    //业务处理
    deviceInfoService.processData(event.getValue().getDeviceInfoStr());
  }
}

SeriesDataEventQueueHelper.java

@Component
public class SeriesDataEventQueueHelper extends baseQueueHelper implements InitializingBean {
  private static final int QUEUE_SIZE = 1024;
  @Autowired
  private List seriesDataEventHandler;

  @Override
  protected int getQueueSize() {
    return QUEUE_SIZE;
  }

  @Override
  protected com.lmax.disruptor.EventFactory eventFactory() {
    return new EventFactory();
  }

  @Override
  protected WorkHandler[] getHandler() {
    int size = seriesDataEventHandler.size();
    SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
    return paramEventHandlers;
  }

  @Override
  protected WaitStrategy getStrategy() {
    return new BlockingWaitStrategy();
    //return new YieldingWaitStrategy();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
    this.init();
  }
}

ValueWrapper.java

public abstract class ValueWrapper {
  private T value;
  public ValueWrapper() {}
  public ValueWrapper(T value) {
    this.value = value;
  }

  public T getValue() {
    return value;
  }

  public void setValue(T value) {
    this.value = value;
  }
}

DisruptorConfig.java

@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {

  
  @Bean
  public SeriesDataEventHandler smsParamEventHandler1() {
    return new SeriesDataEventHandler();
  }

  
  @Bean
  public SeriesDataEventHandler smsParamEventHandler2() {
    return new SeriesDataEventHandler();
  }

  
  @Bean
  public SeriesDataEventHandler smsParamEventHandler3() {
    return new SeriesDataEventHandler();
  }


  
  @Bean
  public SeriesDataEventHandler smsParamEventHandler4() {
    return new SeriesDataEventHandler();
  }

  
  @Bean
  public SeriesDataEventHandler smsParamEventHandler5() {
    return new SeriesDataEventHandler();
  }
}

测试

  //注入SeriesDataEventQueueHelper消息生产者
  @Autowired
  private SeriesDataEventQueueHelper seriesDataEventQueueHelper;

  @RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
  public DataResponseVo receiverDeviceData(@RequestBody String deviceData) {
    long startTime1 = System.currentTimeMillis();

    if (StringUtils.isEmpty(deviceData)) {
      logger.info("receiver data is empty !");
      return new DataResponseVo(400, "failed");
    }
    seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
    long startTime2 = System.currentTimeMillis();
    logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
    return new DataResponseVo(200, "success");
  }

应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持考高分网。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/141550.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号