前言
Hystrix 在执行时可以使用两种不同的隔离策咯 - Thread(线程)和 Semaphore(信号量)来运行。在默认情况下,Hystrix 以 Thread 隔离策略运行。每个 Hystrix 命令都在一个单独的线程池中运行,该线程池不与父线程共享它的上下文,可以自行的控制线程的执行与中断,而不必担心中断父线程的其他活动。
基于 Semaphore 的隔离,Hystrix 不需要启动新线程,如果调用超时,就会中断父线程。在容器服务环境(Tomcat)中, 中断父线程将导致抛出开发人员无法捕获的异常。要控制命令池的隔离设置,可以在 @HystrixCommand 注解上设置 commandProperties 属性,如下所示:
@HystrixCommand(
commandProperties = {
@HystrixProperty(name="execution.isolation.strategy", value="SEMAPHORE")
}
)
Hystrix 团队建议开发人员对大多数命令使用 Thread 隔离策咯。这可以保持开发人员与父线程之间更高层次的隔离。Thread 隔离比 Semaphore 隔离更重,Semaphore 隔离模型更轻量级,Semaphore 隔离模型适用于服务量很大且正在使用异步I/O编程模型(Netty)运行的情况。
ThreadLocal 与 Hystrix
在默认情况下(Thread 隔离级别),Hystrix不会将父线程的上下文传播到自身所管理的子线程中,在父线程中设置的 ThreadLocal 值在子线程中都是不可用的。
Hystrix 允许开发人员定义一种自定义的并发策略,它将包装 Hystrix 调用,并允许开发人员将附加的父线程上下文注入由 Hystrix 管理的子线程中。
开发人员需要执行以下3个操作:
- 自定义Hystrix 并发策略类
- 定义 Callable 子类,将 ThreadLocal 注入到 Hystrix 命令中
- 配置 Spring Cloud 以使用自定义 hystrix 并发策略
自定义 Hystrix 并发策略类
ThreadLocalAwareStrategy
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {
private final HystrixConcurrencyStrategy existingConcurrencyStrategy;
public ThreadLocalAwareStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty corePoolSize,
HystrixProperty maximumPoolSize,
HystrixProperty keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public Callable wrapCallable(Callable callable) {
// 注入 Callable 实现, 他将设置UserContext
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.wrapCallable(DelegatingUserContextCallable.create(callable, UserContextHolder.getContext()))
: super.wrapCallable(DelegatingUserContextCallable.create(callable, UserContextHolder.getContext()));
}
}
因为 Spring Cloud 已经定义了一个 HystrixConcurrencyStrategy,所以需要检查现有的策略类是否存在,然后调用现有的并发策略的方法或调用基类的并发策略方法。
定义 Callable 子类
DelegatingUserContextCallable
@Slf4j
public class DelegatingUserContextCallable implements Callable {
private final Callable delegate;
private UserContext originalUserContext;
public DelegatingUserContextCallable(Callable delegate, UserContext userContext) {
this.delegate = delegate;
this.originalUserContext = userContext;
}
/**
* 方法会在被 {@link HystrixCommand} 注解保护的方法之前调用
*
* @author nza
* @createTime 2021/3/7 13:58
*/
@Override
public V call() throws Exception {
// 设置当前线程的上下文对象
UserContextHolder.setUserContext(originalUserContext);
try {
return delegate.call();
} finally {
this.originalUserContext = null;
log.info("删除 hystrix 线程本地变量, currentIp: {}", UserContextHolder.getContext().getCurrentIp());
UserContextHolder.remove();
}
}
public static Callable create(Callable delegate, UserContext userContext) {
return new DelegatingUserContextCallable(delegate, userContext);
}
}
将父线程的 ThreadLocal 值设置到子线程中。
配置Spring Cloud 使用自定义并发策略
ThreadLocalConfiguration
@Configuration
public class ThreadLocalConfiguration {
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
// 保留现有的插件引用
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
.getCommandExecutionHook();
HystrixPlugins.reset();
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}
保留已有的插件引用,并将自定义的并发策略类挂勾到 Spring Cloud 与 Hystrix 中。
小结
Hystrix 支持两种不同的隔离策咯 - Thread(线程)和 Semaphore(信号量),默认使用 Thread 模式。
Hystrix 允许自定义并发策略的实现,将父线程上下文注入 HyStrix 管理的子线程中。