我们在平常业务开发或者深入了解一些框架实现时,经常会看到责任链模式的运用。责任链模式,通俗来讲,是若干个处理器组成一条链,当收到请求时,依次经过每个处理器,每个处理器只负责单独的一块逻辑。实现责任链模式有很多种方式,我们由浅入深,共同来探讨如何来实现责任链模式。
考虑一个场景,当我们接受一个用户请求,需要先做登录验证、权限验证、业务验证等许多验证,然后再做业务处理。我们首先想到的是将登录、权限、业务这些验证抽象成一个个过滤器,类似于这样:
public interface Filter {
void doFilter(Request request);
}
登录验证
public class LoginFilter implements Filter {
@Override
public void doFilter(Request request) {
System.out.println("执行登录验证...");
}
}
权限验证
public class AuthorityFilter implements Filter {
@Override
public void doFilter(Request request) {
System.out.println("执行权限验证...");
}
}
业务验证
public class BusinessFilter implements Filter {
@Override
public void doFilter(Request request) {
System.out.println("执行业务验证...");
}
}
然后用过滤器链组装所有的过滤器:
public class FilterChain implements Filter {
private List filters = new ArrayList<>();
@Override
public void doFilter(Request request) {
filters.forEach(filter -> filter.doFilter(request));
}
public FilterChain addFilter(Filter filter) {
filters.add(filter);
return this;
}
}
并且按照如下方式进行调用:
@org.junit.Test
public void test1() {
com.example.filterchain.v1.FilterChain chain = new com.example.filterchain.v1.FilterChain();
chain.addFilter(new com.example.filterchain.v1.LoginFilter())
.addFilter(new com.example.filterchain.v1.AuthorityFilter())
.addFilter(new com.example.filterchain.v1.BusinessFilter());
chain.doFilter(new Request());
}
测试结果:
这种方式是基于数组遍历依次调用每个处理器的filter方法来实现各种验证,但是它有一些缺陷:
不能控制过滤器的执行,只能被动执行所有过滤器的处理逻辑
没有返回值,不能在过滤器中加入对返回值的修改
因此我们优化Filter接口及实现:
public interface Filter {
void doFilter(Request request, Response response, FilterChain chain);
}
// 登录验证
public class LoginFilter implements Filter {
@Override
public void doFilter(Request request, Response response, FilterChain chain) {
System.out.println("执行登录验证...");
chain.doFilter(request, response);
System.out.println("登录-处理response...");
}
}
// 权限验证
public class AuthorityFilter implements Filter {
@Override
public void doFilter(Request request, Response response, FilterChain chain) {
System.out.println("执行权限验证...");
chain.doFilter(request, response);
System.out.println("权限-处理response...");
}
}
// 业务验证
public class BusinessFilter implements Filter {
@Override
public void doFilter(Request request, Response response, FilterChain chain) {
System.out.println("执行业务验证...");
chain.doFilter(request, response);
System.out.println("业务-处理response...");
}
}
过滤器链
public class FilterChain {
private List filters = new ArrayList<>();
private int index;
public void doFilter(Request request, Response response) {
if (index < filters.size()) {
filters.get(index++).doFilter(request, response, this);
}
}
public FilterChain addFilter(Filter filter) {
filters.add(filter);
return this;
}
}
调用:
@org.junit.Test
public void test2() {
com.example.filterchain.v2.FilterChain chain = new com.example.filterchain.v2.FilterChain();
chain.addFilter(new com.example.filterchain.v2.LoginFilter())
.addFilter(new com.example.filterchain.v2.AuthorityFilter())
.addFilter(new com.example.filterchain.v2.BusinessFilter());
chain.doFilter(new Request(), new Response());
}
上述方式中,我们在入参中加入了Response和FilterChain,这样在每个过滤器实现中我们可以控制后续的过滤器是否还需要继续执行,并且可以修改返回值。
测试结果:
上面这种方式也是Tomcat的过滤器模型:
只不过这里的FilterChain被抽取成接口。
以上两种方式都是借助FilterChain内部的数组来实现处理器的管理和逐个调用,我们现在来考虑一下,如果不通过数组,怎么把所有的处理器串联起来?
能不能借助链表,把下一个处理器作为入参传递呢?
public interface Filter {
Response handle(Handler handler, Request request);
}
public interface Handler {
Response handle(Request request);
}
这一次我们把业务处理器作为Handler单独提炼出来,新的Filter接口把下一个处理器作为入参,同时响应消息设置为了返回值。
具体的过滤器:
// 登录验证
public class LoginFilter implements Filter {
@Override
public Response handle(Handler handler, Request request) {
System.out.println("执行登录验证...");
return handler.handle(request);
}
}
// 权限验证
public class AuthorityFilter implements Filter {
@Override
public Response handle(Handler handler, Request request) {
System.out.println("执行权限验证...");
return handler.handle(request);
}
}
// 业务验证
public class BusinessFilter implements Filter {
@Override
public Response handle(Handler handler, Request request) {
System.out.println("执行业务验证...");
return handler.handle(request);
}
}
业务处理器:
public class DefaultHandler implements Handler{
@Override
public Response handle(Request request) {
System.out.println("This is default handler processing...");
return new Response();
}
}
接下来考虑怎么构建一个过滤器链,把过滤器和处理器衔接起来。我们可以借助一个过滤器接口的辅助类FilterNode来建立一个虚拟的处理器:
public class FilterNode implements Handler{
private Handler handler;
private Handler nextHandler;
private Filter filter;
public FilterNode(Handler handler, Handler nextHandler, Filter filter) {
this.handler = handler;
this.nextHandler = nextHandler;
this.filter = filter;
}
@Override
public Response handle(Request request) {
return filter.handle(nextHandler, request);
}
}
FilterNode实现了Handler接口,每次创建FilterNode时,都会把初始处理器、下一个处理器以及过滤器作为构造参数传入,每次执行handle方法时,直接调用filter的handle方法,运行过滤器的逻辑。然后构造一个handler执行链:
public class FilterChainBuilder {
private List filters = new ArrayList<>();
public FilterChainBuilder addFilter(Filter filter) {
filters.add(filter);
return this;
}
// 从最后一个过滤器开始,每个Filter构建一个FilterNode,形成handler,每个handler中都会持有下一个handler引用
public Handler buildFilterChain(Handler handler) {
Handler last = handler;
for (int i = filters.size() - 1; i >= 0; i--) {
last = new FilterNode(handler, last, filters.get(i));
}
return last;
}
}
做一下测试:
@org.junit.Test
public void test3() {
Handler defaultHandler = new DefaultHandler();
FilterChainBuilder builder = new FilterChainBuilder();
builder.addFilter(new com.example.filterchain.v3.LoginFilter())
.addFilter(new com.example.filterchain.v3.AuthorityFilter())
.addFilter(new com.example.filterchain.v3.BusinessFilter());
Handler handler = builder.buildFilterChain(defaultHandler);
handler.handle(new Request());
}
测试结果:
同时通过debug我们可以发现,最外层的handler通过层层引用,直到最后一层的handler为业务处理器。
以上方式在dubbo中有运用,dubbo中的过滤器都是通过这种方式附加在内部Invoker中:
这是dubbo 2.7.12版本中的实现,如果不借助FilterNode,我们也可以通过匿名内部类的方式来构造这种处理器链:
这是2.7.5版本中的实现,效果其实是一样的。
至此,我们已经探讨了3种责任链的实现方式,先做一个小小的总结:
方式1:基于循环遍历数组完成对每个处理器的逐个调用
方式2:基于处理器链参数(FilterChain)在每个处理器中完成对下一个处理器的调用
方式3:区分处理器与过滤器,在构建处理器时,增加对下一个处理器和当前过滤器的引用,在处理方法中执行过滤器处理。
在日常开发中,我们有些业务场景可能比较复杂,以上三种方式可能仍然不能满足需要。所以我们仍需更进一步的探索责任链实现方式,比如管道模式。
public interface TaskHandler {
default void executeTask(TaskHandlerContext ctx, DelegateTask task) {
ctx.fireTaskExecuted(task);
}
}
public interface TaskPipeline {
TaskPipeline fireTaskExecuted();
TaskPipeline addLast(TaskHandler handler);
}
@Component
@Scope("prototype")
public class TaskHandlerContext {
TaskHandlerContext prev;
TaskHandlerContext next;
TaskHandler handler;
public void fireTaskExecuted(DelegateTask task) {
invokeTaskExecuted(next(), task);
}
static void invokeTaskExecuted(TaskHandlerContext ctx, DelegateTask task) {
if (null != ctx)
ctx.handler().executeTask(ctx, task);
}
private TaskHandlerContext next() {
return next;
}
private TaskHandler handler() {
return handler;
}
}
这里我们声明了TaskHandler,TaskPipeline两个接口,同时创建了TaskHandlerContext,把TaskHandler包含在内,又增加了对前一个上下文和后一个上下文的指向,形成双向链表,这样,所有的
TaskHandlerContext组合在一起形成了一个管道,其实这与之前的FilterChain类似,只不过增加了上下文概念,可以附加更多的信息。图形看起来比较好理解:
默认管道实现类:
@Component
@Scope("prototype")
public class DefaultTaskPipeline implements TaskPipeline, ApplicationContextAware, InitializingBean {
private static final TaskHandler DEFAULT_TASK_HANDLER = new TaskHandler() {};
private ApplicationContext applicationContext;
private TaskHandlerContext head;
private TaskHandlerContext tail;
private DelegateTask task;
public DefaultTaskPipeline(DelegateTask task) {
this.task = task;
}
@Override
public TaskPipeline fireTaskExecuted() {
TaskHandlerContext.invokeTaskExecuted(head, task);
return this;
}
@Override
public TaskPipeline addLast(TaskHandler handler) {
TaskHandlerContext context = getContext(handler);
context.next = tail;
context.prev = tail.prev;
tail.prev.next = context;
tail.prev = context;
return null;
}
@Override
public void afterPropertiesSet() throws Exception {
head = getContext(DEFAULT_TASK_HANDLER);
tail = getContext(DEFAULT_TASK_HANDLER);
head.next = tail;
tail.prev = head;
}
private TaskHandlerContext getContext(TaskHandler taskHandler) {
TaskHandlerContext context = applicationContext.getBean(TaskHandlerContext.class);
context.handler = taskHandler;
return context;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
TaskHandler实现:
@Component
public class TaskHandler1 implements TaskHandler, Ordered {
@Override
public void executeTask(TaskHandlerContext ctx, DelegateTask task) {
System.out.println("TaskHandler1 processing...");
ctx.fireTaskExecuted(task);
}
@Override
public int getOrder() {
return 0;
}
}
@Component
public class TaskHandler2 implements TaskHandler, Ordered {
@Override
public void executeTask(TaskHandlerContext ctx, DelegateTask task) {
System.out.println("TaskHandler2 processing...");
ctx.fireTaskExecuted(task);
}
@Override
public int getOrder() {
return 1;
}
}
@Component
public class TaskHandler3 implements TaskHandler, Ordered {
@Override
public void executeTask(TaskHandlerContext ctx, DelegateTask task) {
System.out.println("TaskHandler3 processing...");
ctx.fireTaskExecuted(task);
}
@Override
public int getOrder() {
return 2;
}
}
这里把DefaultTaskPipeline和TaskHandlerContext都声明为原型bean,是想借助spring能力,每次获取时都重新创建新的实例,同时在初始化的扩展点中,搜索TaskHandler实现,把它们做排序,然后依次加入到管道中,当然,如果不想依赖spring,手动构建也是一样的,不过既然spring都为你提供好了一切,为啥放着这么好的工具不用是不是…
@Component
public class TaskHandlerBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
// spring bean初始化扩展点,每次从容器中获得 DefaultTaskPipeline 时,都会搜索TaskHandler实现,把它们做排序,然后依次加入到管道中
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultTaskPipeline) {
DefaultTaskPipeline pipeline = (DefaultTaskPipeline) bean;
Map handlerMap = applicationContext.getBeansOfType(TaskHandler.class);
List handlerList = new ArrayList<>(handlerMap.values());
AnnotationAwareOrderComparator.sort(handlerList);
handlerList.forEach(handler -> pipeline.addLast(handler));
}
return bean;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
来做个测试:
@org.junit.Test
public void test4() {
// 扫描当前测试类所在包,创建spring容器
ApplicationContext ctx = new AnnotationConfigApplicationContext(
ClassUtils.getPackageName(Test.class.getName()));
TaskPipeline pipeline = ctx.getBean(TaskPipeline.class, new TaskEntity());
pipeline.fireTaskExecuted();
}
测试结果:
像这种管道模式,我们还可以在TaskHandler和TaskPipeline接口上做很多事情,比如某个处理器虽然加入到了管道中,但它可以基于某些逻辑进行过滤,所有处理器处理完毕后,我们也可以在尾节点触发某项动作,等等,按需扩展好了。
很多开源框架都用到管道模式,典型的像netty,tomcat。tomcat中的管道会稍有些不同,不过大致思路是一样的。
以上几种责任链实现方式由易到难,扩展性也由低到高,实际运用中,可结合具体业务场景进行选择。



