概要
对于连接池,有三个重要逻辑:获取连接 ,创建连接,维持连接 以及回收连接。
同时由于涉及到占用数据库的连接资源,因此连接池需要严格维护几个数字:
- maxPoolCount:也可以叫maxActiveCount连接池的最大活跃连接数量。
- minPoolCount:也可以叫minIdleCount,连接池的最少保持空闲连接数量。
获取连接
druid的连接池,我们先看下几个主要的城边变量:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// stats 这些都是统计使用,暂时忽略
private volatile long recycleErrorCount = 0L;
。。。。。
// store
// 真正的连接池存储
private volatile DruidConnectionHolder[] connections;
// 当前的连接数量
private int poolingCount = 0;
// 当前活跃的链接数量
private int activeCount = 0;
// 当前废弃的链接数量
private long discardCount = 0;
......
//
private DruidConnectionHolder[] evictConnections;
private DruidConnectionHolder[] keepAliveConnections;
在DruidDataSource的父类DruidAbstractDataSource中,也有几个重要的属性1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19...
// 初始化容量
protected volatile int initialSize = DEFAULT_INITIAL_SIZE;
// 最大连接数
protected volatile int maxActive = DEFAULT_MAX_ACTIVE_SIZE;
// 最少空闲连接数
protected volatile int minIdle = DEFAULT_MIN_IDLE;
//最大空闲连接数,已经废弃
protected volatile int maxIdle = DEFAULT_MAX_IDLE;
// 最大等待时长
protected volatile long maxWait = DEFAULT_MAX_WAIT;
protected int notFullTimeoutRetryCount = 0;
....
// 全局锁
protected ReentrantLock lock;
protected Condition notEmpty;
protected Condition empty;
// 活跃连接锁
protected ReentrantLock activeConnectionLock = new ReentrantLock();
druid本质上是一个生产消费者模型,因此其中的lock是druid最重要的一个锁,下面的empty是lock的两个消费者condition,notEmpty是生产者condition。该lock可以根据配置生成公平锁或非公平锁,默认是非公平,效率较高。
看代码能发现,除了一些统计数据使用Atomic的cas实现之外,大部分的关键数据和count的变更都是依赖lock实现线程安全的。
现在是关键的getConnection()的实现,参见getConnectionInternal():1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
//一些常规检测
....
for (boolean createDirect = false;;) {
//缺省情况下,一个DruidDataSource会使用两个线程分别用于创建连接和销毁或检测连接。
//在分库分表的某些场景,可能需要数百甚至数千个数据库,因此会创建大量的线程。这里暂时只讲解普通连接。
....
}
try {
//全局锁 获取连接
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
...
try{
// 增加连接数
connectCount++;
//根据是否有超时时间获取连接
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
}finally{
// 释放全局锁
lock.unlock();
}
我们在看takeLast()方法,本质上是一个生产-消费者模型的消费者,获取连接。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// 获取连接,注意,这时线程还持有之前获取的的全局lock,因此不会出现并发问题。
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
// poolingCount就是可用连接数
while (poolingCount == 0) {
//池子里没有,等待连接,唤醒empty
emptySignal(); // send signal to CreateThread create connection
....
try {
// notEmpty等待,释放lock锁,等待唤醒
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
...
}
}...
// decrement poolingCount;
decrementPoolingCount();
// 清除connections中相应的连接。
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
return last;
}
创建连接
在init时会创建初始的连接,这里暂时不表。
另一个是在获取连接的时候发现连接不够而创建,参见emptySignal()方法。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}
if (createTaskCount >= maxCreateTaskCount) {
return;
}
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask();
this.createSchedulerFuture = createScheduler.submit(task);
}
有个容易混淆的是,在emptySignal()这个方法中,看起来是做了一些判断,然后创建了创建连接的任务交给createScheduler执行,但普通使用情况中,createScheduler=null,所以直接就返回了。
那我们看看普通场景下真正创建连接的producer:CreateConnectionThread,而不是schedule使用的CreateConnectionTask。CreateConnectionThread在连接池初始化时创建,在连接池销毁时销毁。
1 | public void run() { |
通过上面分析,当消费者发现无连接可用时,会唤醒生成者,而生产者发现最大连接数已满,再次await。这带来一个性能问题:在高并发情况下,如果连接数已满,新发起的连接请求都会经过这一次消费者await,生产者被notify,然后再次await的场景,产生大量的线程切换,这是一笔不菲的消耗。
为什么不在消费者获取连接的时候去检测是否连接已满呢?只有连接数不足的情况下才去唤醒emptySignal()。
维持连接
Druid使用在启动时会创建一个DestoryTask,放在schedule线程池,默认每分钟执行一次shrink方法来维持连接,同时也会将超时的连接废弃掉,这里我们只看shrink,参见代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71public void shrink(boolean checkTime, boolean keepAlive) {
//全局锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}
//这里代码比较多,简单来说就是在所有连接中,找出哪些存活时间太久(maxEvictableIdleTimeMillis)需要释放,哪些需要keepalive。
//同时需要判断灵位一个属性,minIdle,最少连接数。
//这里有个关键步骤,将所有要处理的链接从主池子connections里去掉,后续keepalive将或者的连接加回来。
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}
} finally {
lock.unlock();
}
// 释放存活时间过长的连接
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCount.incrementAndGet();
}
Arrays.fill(evictConnections, null);
}
if (keepAliveCount > 0) {
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();
boolean validate = false;
try {
// 重新验证连接是否可用。
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}
// 连接可用,加回主连接池,不可能关闭。
if (validate) {
holer.lastActiveTimeMillis = System.currentTimeMillis();
put(holer);
} else {
JdbcUtils.close(connection);
}
}
Arrays.fill(keepAliveConnections, null);
}
}
值得注意的是,Druid为每种数据库实现了不同逻辑的isValidateConnection()方法,通常检测Valid的方法是调用一个简单的sql语句,但很多数据库都提供了不同的高效手段,比如通过mysql的ping方法实现高效检测。
回收连接
在Connection的生命周期中,使用完以此连接,回调用Connection.close()方法。
Druid自定义DruidPooledConnection实现了close方法,不是关闭连接,而是调用DruidDataSource回收。
这里笔者就不赘述了,大家查看DruidDataSource.recyle()方法即可,逻辑比较简单,只是判断条件比较多。
总结
Druid使用多线程中的常见案例生产者-消费者模型来构建整个并发体系,采用ReenTrantLock作为技术基础,高效的实现了数据库连接池。同时,它还有这丰富的统计模型进行数据库使用数据分析(希望后面有机会分析)。
但有些遗憾的是,Druid对于CAS模型使用较少,锁场景比较多,在高并发情况下线程调度还是会消耗不少的时间。看源代码发现,Druid应该有两套开发思路,一种是在核心的资源调度上用ReetTrantLock,而在统计场景中较多的使用了CAS,没有形成很好的统一。