消费者在真正发起对provider的调用之前,会先经过Cluster层,里面就是我们常说的集群容错方案。
从Dubbo整体设计图上来看(参考:https://dubbo.apache.org/zh/docsv2.7/dev/design/ ),集群容错层位于以下位置:
为什么会需要容错方案呢?
当消费者在调用provider(一般会有多个提供者)时,有可能因为网络或其他原因导致失败,这时,框架需要能捕获到对应异常,并重新发起对其他provider的调用。
这个就被称为集群容错方案。下面一起来看下Dubbo提供的那些容错方案。
1.集群容错方案概览Dubbo Cluster接口有一系列的实现类,如下图所示:
我们找几个重点的容错方案来了解下
注意:集群容错方案的切换可以通过在接口上设置,如下所示
2.FailoverCluster (失败重试,默认方案) 2.1 适用场景
失败重试方案,当consumer调用provider失败后,会自动切换到其他的provider服务器进行重试(默认重试次数为2)。
通常这种方案适用于:读操作或者具有幂等的写操作。
2.2 源码解析public class FailoverCluster extends AbstractCluster {
// 这里的NAME,就是上面在中配置的cluster属性值
public final static String NAME = "failover";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
// 重点在这里
return new FailoverClusterInvoker<>(directory);
}
}
2.2.1 FailoverClusterInvoker
public class FailoverClusterInvokerextends AbstractClusterInvoker { public Result doInvoke(Invocation invocation, final List > invokers, LoadBalance loadbalance) throws RpcException { // 这里是所有的provider Invoker集合 List > copyInvokers = invokers; checkInvokers(copyInvokers, invocation); // 获取方法名称 String methodName = RpcUtils.getMethodName(invocation); // 获取对应方法的retry参数配置,默认为2,所以总共尝试调用provider的次数为3 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List > invoked = new ArrayList >(copyInvokers.size()); // invoked invokers. Set providers = new HashSet (len); for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } // 根据负载均衡策略,选择一个provider,这个策略后续专门说明下 Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { ... } return result; } catch (RpcException e) { // 业务异常则直接抛出,不再重试 if (e.isBiz()) { // biz exception. throw e; } // 其他类型的异常则继续尝试 le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } ... } }
源码比较简单,就是获取retry重试次数参数后,针对所有的provider,通过负载均衡策略,选择一个合适的provider进行调用,非业务异常,则再次重试,直到retry次数后,如果还是失败,则直接抛出异常。
3.FailfastCluster(快速失败) 3.1 适用场景当Consumer调用provider失败后,直接抛出,就只调用一次。
这种比较适合那种非幂等性的写操作。
3.2 源码解析public class FailfastCluster extends AbstractCluster {
public final static String NAME = "failfast";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailfastClusterInvoker<>(directory);
}
}
3.2.1 FailfastClusterInvoker
public class FailfastClusterInvoker4.FailsafeCluster (安全失败) 4.1 适用场景extends AbstractClusterInvoker { public FailfastClusterInvoker(Directory directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); // 依旧按照策略选择一个provider Invoker Invoker invoker = select(loadbalance, invocation, invokers, null); try { // 执行调用 return invoker.invoke(invocation); } catch (Throwable e) { // 失败则直接抛出异常 if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
安全失败,当consumer调用provider失败时,直接忽略异常。比较适合写入审计日志等操作。
4.2 源码解析public class FailsafeCluster extends AbstractCluster {
public final static String NAME = "failsafe";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailsafeClusterInvoker<>(directory);
}
}
4.2.1 FailsafeClusterInvoker
public class FailsafeClusterInvoker5.FailbackCluster (失败自动恢复) 5.1 适用场景extends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); // 同上操作 Invoker invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { // 不同之处在这里,失败了,也是直接返回Result,忽略该异常 logger.error("Failsafe ignore exception: " + e.getMessage(), e); return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore } } }
当调用出现异常时,在后台记录失败的请求,按照一定的策略进行自动重试。比较适合消息通知类的操作
5.2 源码解析public class FailbackCluster extends AbstractCluster {
public final static String NAME = "failback";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new FailbackClusterInvoker<>(directory);
}
}
5.2.1 FailbackClusterInvoker
public class FailbackClusterInvoker6.ForkingCluster(并行调用) 6.1 适用场景extends AbstractClusterInvoker { protected Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { Invoker invoker = null; try { // 如上调用 checkInvokers(invokers, invocation); invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); // 失败后则添加到本地 addFailed(loadbalance, invocation, invokers, invoker); // 直接返回Result return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore } } private void addFailed(LoadBalance loadbalance, Invocation invocation, List > invokers, Invoker lastInvoker) { if (failTimer == null) { synchronized (this) { if (failTimer == null) { // 创建一个定时器 failTimer = new HashedWheelTimer( new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks); } } } // 将当前重试作为一个Task,后续定时触发 RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD); try { failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS); } catch (Throwable e) { logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage()); } } }
当consumer发起调用时,会并行调用多个provider的服务(上面都是每次只调用某一个provider),只要其中一个成功即返回。
这种模式适用于:实时性要求比较高的读操作
6.2 源码解析public class ForkingCluster extends AbstractCluster {
public final static String NAME = "forking";
@Override
public AbstractClusterInvoker doJoin(Directory directory) throws RpcException {
return new ForkingClusterInvoker<>(directory);
}
}
6.2.1 ForkingClusterInvoker
public class ForkingClusterInvoker总结:extends AbstractClusterInvoker { public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); final List > selected; final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS); final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // 选择forks个数个provider if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<>(forks); while (selected.size() < forks) { Invoker invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { //Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue
本文介绍了Dubbo Consumer常用的几种集群容错方案,我们可以在适当的场景选择合适的方案。
当然也可以自定义,自定义也比较简单(源于Dubbo的高可扩展性),只需要实现Cluster接口,然后创建文件将该实现类添加到指定文件即可。



