依赖如下
什么是Hystrixcom.netflix.hystrix hystrix-core 1.5.18
2018.11发布了最后一个版本,目前处理维护阶段,不再升级版本
-
用途:
-
- 停止级联故障。fallback和优雅的降级,Fail fast和快速恢复
-
- 实时监控和配置实时变更
-
- 资源隔离,部分不可用不会导致整体系统不可用
-
场景:商品列表接口中,需要获取红包、价格、标签等数据。这时候可以给这个一个线程池。
如果线程池打满,也不会影响当前服务的非商品列表接口 -
使用的框架:hystrix主要使用Rxjava,上手可参考:https://www.jianshu.com/p/5e93c9101dc5
Hystrix以command作为入口执行。AbstractCommand实现了Command命令的几乎所有逻辑,有两个子类 HystrixCommand、HystrixObservableCommand
99%场景都是用HystrixCommand,所以下面只讲解这个命令类,其提供execute()同步执行、queue()异步执行 的两个方法
public abstract class HystrixCommandtoObservable()方法extends AbstractCommand implements HystrixExecutable , HystrixInvokableInfo , HystrixObservable { // .......省略所有构造器 // .......省略静态配置内部类Setter // 用于执行的线程 private final AtomicReference executionThread = new AtomicReference (); private final AtomicBoolean interruptonFutureCancel = new AtomicBoolean(false); protected abstract R run() throws Exception; protected R getFallback() { throw new UnsupportedOperationException("No fallback available."); } @Override protected boolean isFallbackUserDefined() { Boolean containsFromMap = commandContainsFallback.get(commandKey); if (containsFromMap != null) { return containsFromMap; } else { Boolean toInsertIntoMap; try { getClass().getDeclaredMethod("getFallback"); toInsertIntoMap = true; } catch (NoSuchMethodException nsme) { toInsertIntoMap = false; } commandContainsFallback.put(commandKey, toInsertIntoMap); return toInsertIntoMap; } } @Override protected boolean commandIsScalar() { return true; } public R execute() { try { // 其实queue是一个Future,用get的话会阻塞等待结果,所以execute()是同步执行的指令 // 最终还会调用到queue() return queue().get(); } catch (Exception e) { throw Exceptions.sneakyThrow(decomposeException(e)); } } public Future queue() { // toObservable():用于通过订阅{@link Observable}来实现带有回调的命令的异步执行。 final Future delegate = toObservable().toBlocking().toFuture(); final Future f = new Future () { @Override public boolean cancel(boolean mayInterruptIfRunning) { if (delegate.isCancelled()) { return false; } if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) { interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning); } final boolean res = delegate.cancel(interruptOnFutureCancel.get()); if (!isExecutionComplete() && interruptOnFutureCancel.get()) { final Thread t = executionThread.get(); if (t != null && !t.equals(Thread.currentThread())) { // 最终还是调用interrupt()来中断了 t.interrupt(); } } return res; } @Override public boolean isCancelled() { return delegate.isCancelled(); } @Override public boolean isDone() { return delegate.isDone(); } @Override public R get() throws InterruptedException, ExecutionException { return delegate.get(); } @Override public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return delegate.get(timeout, unit); } }; if (f.isDone()) { try { f.get(); return f; } catch (Exception e) { // 获取待抛出的异常 Throwable t = decomposeException(e); if (t instanceof HystrixBadRequestException) { return f; } else if (t instanceof HystrixRuntimeException) { HystrixRuntimeException hre = (HystrixRuntimeException) t; switch (hre.getFailureType()) { case COMMAND_EXCEPTION: case TIMEOUT: // we don't throw these types from queue() only from queue().get() as they are execution errors return f; default: // these are errors we throw from queue() as they as rejection type errors throw hre; } } else { throw Exceptions.sneakyThrow(t); } } } return f; } }
上面可看出HystrixCommand下有execute()、queue()方法。然后最终的都会调用到queue()中的toObservable()。下面解析toObservable()
- 步骤(核心方法 applyHystrixSemantics()):
- 判断线程是否NOT_STARTED,否则抛出HystrixRuntimeException,CAS确保当前命令执行唯一性
- 使用HystrixRequestLog记录该命令的执行(requestLogEnabled = false 关闭日志记录)
- 开启了请求缓存,就从缓存拿数据
3.1 requestCacheEnabled = true && getCacheKey() != null (所以重写获取缓存方法时不要返回null,不然不生效) - 如果缓存没开启或未命中,则执行目标命令获得结果
4.1 Observable.defer()目标方法不回立即执行,需要订阅才异步执行
4.2 !!!applyHystrixSemantics()方法为执行目标方法最最最核心逻辑!!! - 若开启了缓存,把结果放进缓存
- return返回结果,并在注册上相关清理工作
6.1 terminateCommandCleanup:把线程状态标记为TERMINAL
6.1.1 目标代码没有被执行(比如从缓存拿的结果):清空定时监听、记录执行耗时、HystrixCommandMetrics#markCommandDone(),触发执行完成后函数回调(若endCurrentThreadExecutingCommand不为null的话)
6.1.2 目标执行执行了。使用markCommandDone(true)标记
6.2 unsubscribeCommandCleanup会把线程状态记为UNSUBSCRIBED。触发executionHook.onUnsubscribe
6.3 fireOnCompleteHook仅触发executionHook.onSuccess
public abstract class HystrixCommandapplyHystrixSemantics()方法extends AbstractCommand implements HystrixExecutable , HystrixInvokableInfo , HystrixObservable { public Observable toObservable() { // 省略前面的Action和Fun // 通过Observable.defer()创建一个Observable return Observable.defer(() -> { // 这是一个有状态对象,所以只能使用一次(CAS替换限制仅进入一次) if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance."); //TODO make a new error type for this throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null); } // 填充命令开始时间 commandStartTimestamp = System.currentTimeMillis(); // 判断i请求日志是否开启 if (properties.requestLogEnabled().get()) { // log this command execution regardless of what happened if (currentRequestLog != null) { currentRequestLog.addExecutedCommand(_cmd); } } // 请求缓存是否开启 final boolean requestCacheEnabled = isRequestCachingEnabled(); // -- cacheKey默认是null,所以默认是不开启的,要设置才开启缓存 final String cacheKey = getCacheKey(); // 缓存开启了,就优先从缓存中取 if (requestCacheEnabled) { // 从缓存里面取 HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache ) requestCache.get(cacheKey); if (fromCache != null) { // 如果不为空。就将isResponseFromCache = true,并返回数据 isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } // 包装成hystrixObservable对象 // 获得 执行命令Observable Observable hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); // 获得 缓存Observable Observable afterCache; // 是否压入缓存中(在cacheKey不为空的情况下才进行处理。默认cacheKey是空的,这个要设置才会有) // put in cache if (requestCacheEnabled && cacheKey != null) { // wrap it for caching HystrixCachedObservable toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache fromCache = (HystrixCommandResponseFromCache ) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null) { // another thread beat us so we'll use the cached value instead toCache.unsubscribe(); isResponseFromCache = true; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { // we just created an ObservableCommand so we cast and return it afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache // 订阅即将被终止时的监听,无论是正常终止还是异常终止 .doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line)) // 取消订阅时的监听 .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once // Observable正常终止时的监听 .doOnCompleted(fireOnCompletedHook); }); } }
最终执行目标方法会调用到applyHystrixSemantics()
执行步骤如下:
- 判断断路器是否允许执行circuitBreaker.allowRequest()。不允许直接执行断路逻辑fallback
- 尝试获取信号量资源。线程池隔离模式会采用TryableSemaphoreNoOp,直接返回true
- 执行目标方法executeCommandAndObserve()。如果执行失败就会执行断路逻辑,调用handleSemaphoreRejectionViaFallback()
class AbstractCommand {
// .........省略其他
private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
// 执行前进行标记
// 源码中有很多executionHook、eventNotifier的操作,这是Hystrix拓展性的一种体现。这里面啥事也没做,留了个口子,开发人员可以拓展
executionHook.onStart(_cmd);
// 决定断路器是否允许执行
// -- 开启了断路器调用(withCircuitBreakerEnabled(true)):HystrixCircuitBreakerImpl
// -- 关闭了断路器调用(withCircuitBreakerEnabled(false)):NoOpCircuitBreaker。返回true
if (circuitBreaker.allowRequest()) {
// 获取执行信号量。如果没配置信号量模式,返回TryableSemaphoreNoOp.DEFAULT
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = () -> {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
// 如果是TryableSemaphoreNoOp.DEFAULT,那就是一个空操作的方法
executionSemaphore.release();
}
};
final Action1 markExceptionThrown = t -> eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
// 判断是否信号量拒绝
// 线程池模式会采用TryableSemaphoreNoOp,直接返回true
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 重点!!!!:处理隔离策略和Fallback策略()
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
// -- executeCommandAndObserve处理隔离策略和各种fallback。最终要执行目标方法的!!!
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
// 走到这步骤,说明不接受请求。会执行fallback()方法
return handleShortCircuitViaFallback();
}
}
}
执行目标方法
执行目标方法核心方法是executeCommandAndObserve()
abstract class AbstractCommand方法降级implements HystrixInvokableInfo , HystrixObservable { private Observable executeCommandAndObserve(final AbstractCommand _cmd) { // 执行上下文。保证线程池内亦能获取到主线程里的参数 final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1 markEmits = r -> { // 是否应该在onNext这步报告数据 // HystrixCommand -> false // HystrixObservableCommand -> true if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { // 命令是否是标量 // HystrixCommand -> true // HystrixObservableCommand -> false long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // 这几句代码是重点: // 记录结果为SUCCESS成功 // 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了) eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } }; final Action0 markonCompleted = () -> { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); // 这几句代码是重点:跟上面markEmits作用一样。不过commandIsScalar() == false才调用,即HystrixObservableCommand情况下使用 // 记录结果为SUCCESS成功 // 并且,并且,并且circuitBreaker.markSuccess();(若断路器是打开的,此处就关闭了) eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } }; final Func1 > handleFallback = new Func1 >() { @Override public Observable call(Throwable t) { // 把Throwable t强转为Exception e(若不是Exception类型就包装为Exception类型) // 比如若t是NPE异常,那么t和e是完全一样的。 // 只有当t是error类时,t才和e不相等 Exception e = getExceptionFromThrowable(t); // 记录执行时候的异常e executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { // 线程池拒绝 return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { // 目标方法执行超时 return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { // 下文详细分解 return handleBadRequestByEmittingError(e); } else { // 兜底方法,只有当子类复写了getExceptionFromThrowable()方法的时候才有可能进入到这里 if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1 > setRequestContext = new Action1 >() { @Override public void call(Notification super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable execution; // 重点在于:executeCommandWithSpecifiedIsolation() // 重点在于:executeCommandWithSpecifiedIsolation() if (properties.executionTimeoutEnabled().get()) { // 开启了超时支持。 多了.lift(new HystrixObservableTimeoutOperator (_cmd))的调用 execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator (_cmd)); } else { // 未开启超时支持 execution = executeCommandWithSpecifiedIsolation(_cmd); } // 得到execution后,开始注册一些基本的事件、观察者 // -- doonNext():观察者被回调之前的调用(此时其实数据已经发送,也就是目标方法已经执行了) return execution.doOnNext(markEmits) // -- doonCompleted():正常完成时调用 .doOnCompleted(markOnCompleted) // -- onErrorResumeNext():执行出错时调用 .onErrorResumeNext(handleFallback) // -- doonEach():每次调用都会执行。为子线程设置请求上下文,完成跨线程通信 .doOnEach(setRequestContext); } private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { // 线程池隔离(默认) // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE) return Observable.defer(new Func0 >() { @Override public Observable call() { // 预先设置线程池隔离占用,记录数据 executionResult = executionResult.setExecutionOccurred(); // 线程状态必须是OBSERVABLE_CHAIN_CREATED时才让执行 // 而此状态是由toObservable()方法设置过来的 if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } // 收集指标信息:开始执行 metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); // run()方法还没执行了。在线程切换期间就超时了,直接返回异常 if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { // the command timed out in the wrapping thread so we will return immediately // and not increment any of the counters below or other such logic return Observable.error(new RuntimeException("timed out before executing run()")); } // CAS将线程状态置为启动状态ThreadState.STARTED if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { //we have not been unsubscribed, so should proceed // 线程 全局计数器+1,信号量是不需要这个计数器 HystrixCounters.incrementGlobalConcurrentThreads(); // 标记线程线程准备开始执行 threadPool.markThreadExecution(); // store the command that is being run // 这个保存使用的ThreadLocal >和当前线程绑定 // 这样确保了命令在执行时的线程安全 endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); try { // 执行钩子程序,以及执行目标run方法程序 executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); // getUserExecutionObservable:getExecutionObservable()抽象方法获取到目标方法 return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { //command has already been unsubscribed, so return immediately return Observable.error(new RuntimeException("unsubscribed before executing run()")); } } }).doOnTerminate(() -> { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { //if it was never started and received terminal, then no need to clean up (I don't think this is possible) } //if it was unsubscribed, then other cleanup handled it }).doOnUnsubscribe(() -> { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { //if it was never started and was cancelled, then no need to clean up } //if it was terminal, then other cleanup handled it }).subscribeOn(threadPool.getScheduler(() -> properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT)); } else { return Observable.defer(() -> { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); // semaphore isolated // store the command that is being run endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw } catch (Throwable ex) { //If the above hooks throw, then use that as the result of the run method return Observable.error(ex); } }); } } }
上面讲解了如何调用一个目标方法,那降级是怎么处理的,接来下看看
什么情况下降级Hystrix触发fallback降级逻辑5种情况:
- short-circuited短路
- threadpool-rejected线程池拒绝
- semaphore-rejected信号量拒绝
- time-out超时
- failed执行失败
但除了上面类型,HystrixBadRequestException异常(不会触发回退、不会计算故障指标)不会触发fallback机制。用于场景如处理400错误码
降级流程图如下:



