笔者公司使用的是k8s在搭建公司的微服务框架,没有使用java系的spring cloud。在rpc框架选择上,为了降低他人的学习成本,没有使用thrift、grpc等rpc框架,而是使用了以http协议为主的“伪rpc框架”feign。
当前还是使用良好,不过由于一些业务问题,单一的http协议在网络调度复杂的微服务框架中,还是会带来一些调度问题。因此最近引入了hystrix,来提升整体系统网络调用上的健壮性。
首先介绍一下Hystrix,因为github主页的原意:
Hystrix是一个延迟和容错库,旨在隔离对远程系统、服务和第三方库的调用,在复杂的分布式系统中,故障是无法避免的,它可以阻止级联故障,并提供整体系统的容错能力。
Hystrix提供如下的能力:
- 延迟和容错能力 阻止级联故障,优雅的降级,快速失败快速回复。基于线程和信号量的隔离,并可以熔断。
- 实时操作:实时监控并变更配置。 拥有快速的反馈机制 报警->做出决定->修改配置->查看变更结果。
- 并发:并行执行,并发获取请求缓存。通过请求折叠自动批处理。
关于Hystrix的使用,github的wiki已经介绍的够详细了,这里列出一些我们使用的要点,和遇到的问题。
1. 设置了fallback,依然没有降级,而是抛出来Reject异常。
第一个问题比较简单,Hystrix为fallback基于信号量也做了隔离和限制,默认是10个,超出会抛异常。
在我们使用时,这个机制作用不会太大,我们还是希望所有的触发fallback,通过fallback处理中的日志和事件来进行报警通知。1
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests=999999
2. 选择threadpool隔离还是semaphore?
目前一些新兴的框架,比如阿里最近开源的Sentinel,也是基于信号量的隔离模式,先说下优势:
- 更轻量级,性能更高。
- 没有创建线程池线程和线程切换的损耗。
- 对开发模型的影响最小。 对我们的开发来讲,使用feign做rpc调用时,需要将一些信息(比如登录token等)作为header或者cookie传给被调用方。如果是thread方式,之前的逻辑是业务但信息存储到threadlocal中,client获取即可。如果用线程池,我们还要做基于threadlocal的封装和传递才行。
那么线程池呢:
- 性能损耗没有那么大,这里可以参考下Hystrix的官方介绍,他们做了大量的测试,对于常见的rpc调用,线程池的消耗比较小,是可以接受的。除非是本地快速的并发任务(hundreds per second, per instance),才需要使用信号量。
- 隔离性更强。
这里着重说下隔离性强的问题,着重点在超时这个问题上。
比如一个快速响应的业务,常规执行时间是20ms,因此我们设置它的超时时间为50ms,并发数为100。如果由于网络抖动,或者异常的锁,或者其它资源导致了问题等等,使得1~2s内所有的任务时间延长到5s。那么熔断是怎么处理的呢?
Threadpool模式,超过50ms获interrupt所有的线程,并触发熔断,并根据配置,快速重试,会在发生抖动的1s左右开始恢复连接并回复正常。
而对于信号量模式,就不那么乐观了。
Sentinel甚至问题更严重,我参考官网的demo写了个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhaohaifeng
* @date 2019-02-02
*/
public class SentinelTest {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger success = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 100;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
tick();
initDegradeRule();
for (int i = 0; i < threadCount; i++) {
Thread entryThread = new Thread(new Runnable() {
public void run() {
while (true) {
Entry entry = null;
try {
TimeUnit.MILLISECONDS.sleep(5);
entry = SphU.entry(KEY);
// token acquired
int p = pass.incrementAndGet();
// sleep 600 ms, as rt
if (p <= 100) {
TimeUnit.MILLISECONDS.sleep(5000);
}
success.incrementAndGet();
} catch (Exception e) {
block.incrementAndGet();
} finally {
total.incrementAndGet();
if (entry != null) {
entry.exit();
}
}
}
}
});
entryThread.setName("working-thread");
entryThread.start();
}
}
private static void initDegradeRule() {
List<DegradeRule> rules = new ArrayList<DegradeRule>();
DegradeRule rule = new DegradeRule();
rule.setResource(KEY);
// set threshold rt, 10 ms
rule.setCount(10);
rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
rule.setTimeWindow(10);
rules.add(rule);
DegradeRuleManager.loadRules(rules);
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
public void run() {
long start = System.currentTimeMillis();
System.out.println("begin to statistic!!!");
long oldTotal = 0;
long oldPass = 0;
long oldBlock = 0;
long oldSuccess = 0;
while (!stop) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
long globalTotal = total.get();
long oneSecondTotal = globalTotal - oldTotal;
oldTotal = globalTotal;
long globalPass = pass.get();
long oneSecondPass = globalPass - oldPass;
oldPass = globalPass;
long globalBlock = block.get();
long oneSecondBlock = globalBlock - oldBlock;
oldBlock = globalBlock;
long globalSuccess = success.get();
long oneSecondSuccess = globalSuccess - oldSuccess;
oldSuccess = globalSuccess;
System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
+ ", pass:" + oneSecondPass + ", block:" + oneSecondBlock+",succcess:"+oneSecondSuccess);
if (seconds-- <= 0) {
stop = true;
}
}
long cost = System.currentTimeMillis() - start;
System.out.println("time cost: " + cost + " ms");
System.out.println("total:" + total.get() + ", pass:" + pass.get()
+ ", block:" + block.get());
System.exit(0);
}
}
}
在这个例子中,第1s的前100个任务延迟到5s,后续恢复正常。那么我们期望的熔断策略也是,第1s触发熔断,并能快速恢复。
而Sentinel呢,参见输出:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20begin to statistic!!!
INFO: log base dir is: /Users/zhaohaifeng/logs/csp/
INFO: log name use pid is: false
1549082439807, total:145, pass:244, block:0,succcess:145
1549082440807, total:169, pass:169, block:0,succcess:169
1549082441811, total:169, pass:169, block:0,succcess:169
1549082442815, total:171, pass:171, block:0,succcess:171
1549082443819, total:170, pass:170, block:0,succcess:170
1549082444823, total:14347, pass:30, block:14218,succcess:129
1549082445828, total:17198, pass:0, block:17198,succcess:0
1549082446829, total:17266, pass:0, block:17266,succcess:0
1549082447833, total:17161, pass:0, block:17161,succcess:0
1549082448834, total:17377, pass:0, block:17377,succcess:0
1549082449838, total:17182, pass:0, block:17182,succcess:0
1549082450840, total:17178, pass:0, block:17178,succcess:0
1549082451845, total:17100, pass:0, block:17100,succcess:0
1549082452850, total:17085, pass:0, block:17085,succcess:0
1549082453854, total:17341, pass:0, block:17341,succcess:0
1549082454858, total:17125, pass:15003, block:2123,succcess:15003
1549082455863, total:17304, pass:17303, block:0,succcess:17303
可以看到第1s前50个任务后,整体恢复正常,但还是出现了长达20s的熔断。
3. 即使并发任务数小于coreSize,还是会出现fallback
场景是这样的:coreSize=100,每次并发提交20个任务,前5次不会fallback,第6次开始每次会触发少量fallback。
这种情况通常是没有设置queueSizeRejectionThreshold这个参数,当5个线程进入队列就会触发降级。
但问题是任务数比线程池的工作线程数corePoolSize小,为什么会进入队列呢?
这是由于java的threadpoolexecutor的机制导致的:
- 如果运行的线程少于corePoolSize,则线程池会添加新的线程。
- 如果运行的线程等于获多于corePoolSize,则线程池会让心的请求进入队列,而不添加新的线程。
- 如果无法将请求加入队列(队列满),则判断当前活跃线程数是否超过maxPoolSize,如果没有超过,创建新线程执行,如果超过,任务会被拒绝。
前5次提交,线程池会创建工作线程到100个,而执行完任务之后这些工作线程不会销毁(没有设置keepalivetime),一直存在。这时有新的任务,即使工作线程空闲,还是会先进入队列。导致出现排队情况。
4. 和feign结合后,线程池默认是一个client共享。
这点和我们的使用逻辑相悖。我们的一个client就是单独业务的聚合,有这个单独业务的查询、修改等等,有的耗时长,有的耗时端。我们不可能要因为线程池共享的问题将我们的业务逻辑拆分,速度快的一个client,速度慢的一个client。
幸亏Hystrix有一个参数threadPoolKeyOverride,可以为每个方法单独设置线程池。1
2## 设置DemoHystrixClient这个client的get方法作为单独的线程池。
hystrix.command.DemoHystrixClient#get().threadPoolKeyOverride=pool-DemoHystrixClient#get()