栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

设计模式之管道模式

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

设计模式之管道模式

简介

管道模式(Pipeline Pattern) 是责任链模式的常用变体之一。在该模式中,管道扮演的是流水线的角色,负责将数据传递到一个加工处理序列中,数据在每个步骤中被加工处理后,传递到下一个步骤进行加工处理,直到全部步骤处理完毕。

场景

适用于业务操作由多个步骤组成而成,特别是后期可能在任意位置对子步骤进行增删改。本博,将继续模拟前文责任链模式的业务场景,如下

运用 上下文基类
@Getter
@Setter
public class PipelineContext{

  
  private LocalDateTime startTime;

  
  private LocalDateTime endTime;

  
  public String getName() {
    return this.getClass().getSimpleName();
  }
}
审核业务上下文
@Getter
@Setter
public class AuditOrderContext extends PipelineContext {

  
  private String tradeFlowNo;

  
  private String content;

  
  private Long auditOrderId;


  @Override
  public String getName() {
    return "审核单处理";
  }
}
上下文处理器
public interface PipelineHandler {

  
  boolean handle(T context);
}
审核单前置处理
@Component
@Slf4j
public class BeforeAuditPipelineHandler implements PipelineHandler {

  @Override
  public boolean handle(AuditOrderContext context) {
    log.info("tradeFlowNo:{},审核前置开始了.",context.getTradeFlowNo());
    if (StringUtils.isEmpty(context.getContent())){
      log.error("The content must is not null");
      // 结束流程
      return false;
    }
    return true;
  }
}
审核单初始化处理
@Component
@Slf4j
public class InitAuditPipelineHandler implements PipelineHandler {

  @Override
  public boolean handle(AuditOrderContext context) {
    log.info("tradeFlowNo:{},审核初始化开始了.", context.getTradeFlowNo());
    if (StringUtils.isEmpty(context.getAuditOrderId())) {
      log.error("审核单初始化失败");
      // 结束流程
      return false;
    }
    return true;
  }
}
审核单后置处理
@Component
@Slf4j
public class PostAuditPipelineHandler implements PipelineHandler {

  @Override
  public boolean handle(AuditOrderContext context) {
    log.info("tradeFlowNo:{},审核后置处理开始了.", context.getTradeFlowNo());
    // todo 业务逻辑
    return true;
  }
}
管道路由配置

类似于前文中,把相关的管道注册到容器中,模拟一个注册表功能。

@Configuration
@Slf4j
public class PipelineRouteConfig implements ApplicationContextAware {

  private ApplicationContext appContext;

  
  private static final Map, List>>>
      PIPELINE_ROUTE_MAP = new HashMap<>(4);

  
  static {
    log.info("管道列表初始化开始...");
    PIPELINE_ROUTE_MAP.put(AuditOrderContext.class,
        Arrays.asList(
            BeforeAuditPipelineHandler.class,
            InitAuditPipelineHandler.class,
            PostAuditPipelineHandler.class
        ));
  }

  
  @Bean("pipelineRouteMap")
  public Map, List>> getHandlerPipelineMap() {
    return PIPELINE_ROUTE_MAP.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
  }

  
  private List> toPipeline(
      Map.Entry, List>>> entry) {
    return entry.getValue().stream().map(appContext::getBean).collect(Collectors.toList());
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    appContext = applicationContext;
  }
}
管道执行器
@Component
@Slf4j
public class PipelineExecutor {

  
  @Resource
  private Map, List>> pipelineRouteMap;


  
  public void acceptAsync(PipelineContext context, BiConsumer callback) {
    CompletableFuture.runAsync(()->{
      boolean success = acceptSync(context);
      if (callback != null) {
        callback.accept(context, success);
      }
    });
  }

  
  public boolean acceptSync(PipelineContext context) {
    Objects.requireNonNull(context, "上下文数据不能为 null");
    // 拿到数据类型
    Class dataType = context.getClass();
    // 获取数据处理管道
    List> pipeline = pipelineRouteMap.get(dataType);

    if (CollectionUtils.isEmpty(pipeline)) {
      log.error("{} 的管道为空", dataType.getSimpleName());
      return false;
    }

    // 管道是否畅通
    boolean lastSuccess = true;
    for (PipelineHandler handler : pipeline) {
      try {
        // 当前处理器处理数据,并返回是否继续向下处理
        lastSuccess = handler.handle(context);
      } catch (Throwable ex) {
        lastSuccess = false;
        log.error("[{}] 处理异常,handler:{}", context.getName(), handler.getClass().getSimpleName(), ex);
      }
      // 不再向下处理
      if (!lastSuccess) { break; }
    }
    return lastSuccess;
  }
}
单元测试 失败案例
@Slf4j
public class AuditPipelineTest extends BaseTest {

  @Resource
  private PipelineExecutor pipelineExecutor;

  @Test
  public void pipelineTest(){
    AuditOrderContext auditOrderContext = new AuditOrderContext();
    auditOrderContext.setTradeFlowNo("OID_202205092148_11000_01_123");
    auditOrderContext.setContent("这是审核内容撒!");
 
    boolean flag = pipelineExecutor.acceptSync(auditOrderContext);
    if (flag){
      log.info("审核单号:{},执行通道成功。",auditOrderContext.getTradeFlowNo());
    }
  }
}

成功案例
@Slf4j
public class AuditPipelineTest extends BaseTest {

  @Resource
  private PipelineExecutor pipelineExecutor;

  @Test
  public void pipelineTest(){
    AuditOrderContext auditOrderContext = new AuditOrderContext();
    auditOrderContext.setTradeFlowNo("OID_202205092148_11000_01_123");
    auditOrderContext.setContent("这是审核内容撒!");
    auditOrderContext.setAuditOrderId(1000L);
    boolean flag = pipelineExecutor.acceptSync(auditOrderContext);
    if (flag){
      log.info("审核单号:{},执行通道成功。",auditOrderContext.getTradeFlowNo());
    }
  }
}

总结
  • 通过管道模式,降低了系统的耦合度,提升了内聚程度与扩展性;
  • 职责单一分明,PipelineExecutor 只做执行工作,不用关心具体的管道细节;
  • 每个 PipelineHandler独立,只负责自己的业务逻辑实现;
  • 新增、删除或者交换子步骤时,都只需要操作路由表的配置,而不需要修改原来的调用代码。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/873880.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号