Druid连接池源码分析

概要

对于连接池,有三个重要逻辑:获取连接创建连接,维持连接 以及回收连接
同时由于涉及到占用数据库的连接资源,因此连接池需要严格维护几个数字:

  • maxPoolCount:也可以叫maxActiveCount连接池的最大活跃连接数量。
  • minPoolCount:也可以叫minIdleCount,连接池的最少保持空闲连接数量。

获取连接

druid的连接池,我们先看下几个主要的城边变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// stats 这些都是统计使用,暂时忽略
private volatile long recycleErrorCount = 0L;
。。。。。
// store
// 真正的连接池存储
private volatile DruidConnectionHolder[] connections;
// 当前的连接数量
private int poolingCount = 0;
// 当前活跃的链接数量
private int activeCount = 0;
// 当前废弃的链接数量
private long discardCount = 0;
......
//
private DruidConnectionHolder[] evictConnections;
private DruidConnectionHolder[] keepAliveConnections;

在DruidDataSource的父类DruidAbstractDataSource中,也有几个重要的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
// 初始化容量
protected volatile int initialSize = DEFAULT_INITIAL_SIZE;
// 最大连接数
protected volatile int maxActive = DEFAULT_MAX_ACTIVE_SIZE;
// 最少空闲连接数
protected volatile int minIdle = DEFAULT_MIN_IDLE;
//最大空闲连接数,已经废弃
protected volatile int maxIdle = DEFAULT_MAX_IDLE;
// 最大等待时长
protected volatile long maxWait = DEFAULT_MAX_WAIT;
protected int notFullTimeoutRetryCount = 0;
....
// 全局锁
protected ReentrantLock lock;
protected Condition notEmpty;
protected Condition empty;
// 活跃连接锁
protected ReentrantLock activeConnectionLock = new ReentrantLock();

druid本质上是一个生产消费者模型,因此其中的lock是druid最重要的一个锁,下面的empty是lock的两个消费者condition,notEmpty是生产者condition。该lock可以根据配置生成公平锁或非公平锁,默认是非公平,效率较高。

看代码能发现,除了一些统计数据使用Atomic的cas实现之外,大部分的关键数据和count的变更都是依赖lock实现线程安全的。

现在是关键的getConnection()的实现,参见getConnectionInternal():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
//一些常规检测
....
for (boolean createDirect = false;;) {
//缺省情况下,一个DruidDataSource会使用两个线程分别用于创建连接和销毁或检测连接。
//在分库分表的某些场景,可能需要数百甚至数千个数据库,因此会创建大量的线程。这里暂时只讲解普通连接。
....
}

try {
//全局锁 获取连接
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
...
try{
// 增加连接数
connectCount++;
//根据是否有超时时间获取连接
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
}finally{
// 释放全局锁
lock.unlock();
}

我们在看takeLast()方法,本质上是一个生产-消费者模型的消费者,获取连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 获取连接,注意,这时线程还持有之前获取的的全局lock,因此不会出现并发问题。
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
// poolingCount就是可用连接数
while (poolingCount == 0) {
//池子里没有,等待连接,唤醒empty
emptySignal(); // send signal to CreateThread create connection

....
try {
// notEmpty等待,释放lock锁,等待唤醒
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
...
}
}...
// decrement poolingCount;
decrementPoolingCount();
// 清除connections中相应的连接。
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

return last;
}

创建连接

在init时会创建初始的连接,这里暂时不表。
另一个是在获取连接的时候发现连接不够而创建,参见emptySignal()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}

if (createTaskCount >= maxCreateTaskCount) {
return;
}

if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}

createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask();
this.createSchedulerFuture = createScheduler.submit(task);
}

有个容易混淆的是,在emptySignal()这个方法中,看起来是做了一些判断,然后创建了创建连接的任务交给createScheduler执行,但普通使用情况中,createScheduler=null,所以直接就返回了。

那我们看看普通场景下真正创建连接的producer:CreateConnectionThread,而不是schedule使用的CreateConnectionTask。CreateConnectionThread在连接池初始化时创建,在连接池销毁时销毁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
//循环检测
for (;;) {
// addLast
try {
// 被唤醒时,全局加锁
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
producerStartCount++;

long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;

try {
boolean emptyWait = true;

if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}

if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}

if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& !(keepAlive && activeCount + poolingCount < minIdle)) {
emptyWaitCount1++;
empty.await();
}

// 防止创建超过maxActive数量的连接
// 这种情况是虽然有现成在等待连接,但连接池已经达到最大连接数,因此await()
if (activeCount + poolingCount >= maxActive) {
emptyWaitCount2++;
empty.await();
continue;
}
}

} catch (InterruptedException e) {
// 一些错误处理
} finally {
// 释放索
lock.unlock();
}
// 如果需要创建物理连接
PhysicalConnectionInfo connection = null;
//创建物理连接
try {
startConnectionCount.incrementAndGet();
connection = createPhysicalConnection();
} catch (SQLException e) {
// 错误处理
} catch (RuntimeException e) {
// 错误处理
} catch (Error e) {
// 错误处理
}

if (connection == null) {
continue;
}

// 存放连接
// 这里需要注意的是,有可能存放失败,说明创建了多于的连接,因此关闭
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}

errorCount = 0; // reset errorCount
}
}

通过上面分析,当消费者发现无连接可用时,会唤醒生成者,而生产者发现最大连接数已满,再次await。这带来一个性能问题:在高并发情况下,如果连接数已满,新发起的连接请求都会经过这一次消费者await,生产者被notify,然后再次await的场景,产生大量的线程切换,这是一笔不菲的消耗。
为什么不在消费者获取连接的时候去检测是否连接已满呢?只有连接数不足的情况下才去唤醒emptySignal()。

维持连接

Druid使用在启动时会创建一个DestoryTask,放在schedule线程池,默认每分钟执行一次shrink方法来维持连接,同时也会将超时的连接废弃掉,这里我们只看shrink,参见代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void shrink(boolean checkTime, boolean keepAlive) {
//全局锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}

int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}
//这里代码比较多,简单来说就是在所有连接中,找出哪些存活时间太久(maxEvictableIdleTimeMillis)需要释放,哪些需要keepalive。
//同时需要判断灵位一个属性,minIdle,最少连接数。

//这里有个关键步骤,将所有要处理的链接从主池子connections里去掉,后续keepalive将或者的连接加回来。
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}

} finally {
lock.unlock();
}

// 释放存活时间过长的连接
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCount.incrementAndGet();
}
Arrays.fill(evictConnections, null);
}

if (keepAliveCount > 0) {
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();

boolean validate = false;
try {
// 重新验证连接是否可用。
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}

// 连接可用,加回主连接池,不可能关闭。
if (validate) {
holer.lastActiveTimeMillis = System.currentTimeMillis();
put(holer);
} else {
JdbcUtils.close(connection);
}
}
Arrays.fill(keepAliveConnections, null);
}
}

值得注意的是,Druid为每种数据库实现了不同逻辑的isValidateConnection()方法,通常检测Valid的方法是调用一个简单的sql语句,但很多数据库都提供了不同的高效手段,比如通过mysql的ping方法实现高效检测。

回收连接

在Connection的生命周期中,使用完以此连接,回调用Connection.close()方法。
Druid自定义DruidPooledConnection实现了close方法,不是关闭连接,而是调用DruidDataSource回收。
这里笔者就不赘述了,大家查看DruidDataSource.recyle()方法即可,逻辑比较简单,只是判断条件比较多。

总结

Druid使用多线程中的常见案例生产者-消费者模型来构建整个并发体系,采用ReenTrantLock作为技术基础,高效的实现了数据库连接池。同时,它还有这丰富的统计模型进行数据库使用数据分析(希望后面有机会分析)。
但有些遗憾的是,Druid对于CAS模型使用较少,锁场景比较多,在高并发情况下线程调度还是会消耗不少的时间。看源代码发现,Druid应该有两套开发思路,一种是在核心的资源调度上用ReetTrantLock,而在统计场景中较多的使用了CAS,没有形成很好的统一。