PAP

  • Home

  • About

  • Tags

  • Categories

  • Archives

整洁架构

Posted on 2019-06-18 | In 架构

整洁架构

翻译自Bob大叔的 https://blog.cleancoder.com/uncle-bob/2012/08/13/the-clean-architecture.html

最近这些年来,我们看到过很多框架设计的想法,这包括:

  • 六边形框架(端口和适配器) Alistair Cockburn提出,Steve Freeman和Nat Pryce在他们精彩的《Growing Object Oriented Software》一书种采用。
  • 洋葱框架 Jeffrey Palermo
  • Screaming Architecture 来自去我去年的博客
  • DCI James Coplien, and Trygve Reenskaug
  • BCE Ivar Jacobson的《Object Oriented Software Engineering: A Use-Case Driven Approach》书中提到

尽管这些架构在细节上个有不同,但又都很相似。 他们有这同样的目标,即关注分离。它们通过分层的方式,至少拥有一个业规则层和一个接口层,来解决这个问题。

每个架构都有以下特点:

  1. 框架独立。架构并不依赖那些功能丰富的软件库。我们可以像使用工具一样使用框架,而不是必须被它们约束。
  2. 容易测试。即使没有数据库、UI、WebServer和其它外部原件,业务规则依然可以测试。
  3. UI独立。UI可以很容易的变化,比如一个Web UI可以在不变更业务规则的情况下被替换成一个控制台界面UI。
    1.数据库独立。你可以将Oracle、SqlServer,更换成其它数据库Mongo、BigTable、CouchDB或者其它。你的业务规则并不依赖数据库。
    1.独立于任何存在的形式。事实上,你的业务规则对外界一无所知。

本文顶部的图片试图将所有架构融合到一个可实现的想法中。

依赖原则

每个同心圆代表软件的不同区域。 通常来说,越深的层次,软件的级别越高。 外圆是方法,内圆是规则。

是这个架构正常工作的就是依赖规则。这个规则规定,代码层面只能向内依赖。内圆不能了解外圆的任何事项。特别的是,在外圆种声明的信息,包括函数、类、变量或者任何软件实体,都不能被内圆的代码引用到。

同样,外圆种的数据格式,尤其是那些框架种生成的,也坚决不能被内圆使用到。 我们不希望外圆种的任何事情影响到内圆。

实体

实体封装了业务规则。一个实体可以是一个拥有方法的对象,或者是数据结构和函数的集合,只要是可以被不同的应用使用就可以。

如果没有XX,只有一个单独的应用,那么实体就是这个应用的业务对象。它们封装了一般的和高级的规则。当外部变化时,它们是最不容易变化的。 比如,它们不会因为分页或者安全问题而产生变化。任何特定的应用层变化都不应该影响的实体层。

用例

该层包含应用的业务规则。它概括和实现了整个系统的用例,这些用例编排实体之间的数据流,并让这些实体使用其企业范围的业务规则来实现用例。

我们不期望该层的改动会影响到实体层,我们也不期望该层会由于外部改动比如数据库、UI或者通用框架的改动而影响。该层和这些问题是隔离的。

应用的操作应该要影响到用例层和上面的软件。如果用例的细节改动了,那么该层的代码也会被影响。

接口适配器

该层种的软件是一组适配器,它们将便于用例和实体的数据格式转换成外部(比如数据库和Web)使用的数据。比如这层包含整个GUI的MVC框架,Presenter,View和Controller都在这里。这个模型基本上是,数据从controller流转到用例,然后再从用例返回的presenter和view。

同样,在这一层中,数据从便于实体和用例的格式转换为便于正在使用的持久性框架(即数据库)的格式。这个圆内部的代码都不应该知道关于数据库的任何信息。如果数据库是一个SQL数据库,那么所有的SQL都应该限制在这个层,特别是这个层中与数据库有关的部分。

将外部服务提供的数据格式转换成内部用例和实体使用的数据格式的适配器,也是在这一层种。

框架和驱动

最外层的圆是由像数据库、Web框架等这种框架和工具组成。通常除了将代码粘贴过来向内部传递数据,这一层不会有太多工作。

这一层是所有细节所在,Web是细节,数据库也是细节。我们将这些放在最外层是让它们不会产生什么危害。

只有4个圆?

并不是,图中只是例子,没有规定只能是这4个,实际上肯定需要超过这4个。无论如何,依赖规则总是固定的,代码中外侧代码依赖内侧代码,当你向内移动的时候,抽象的层次就增加了。外层圆的逻辑是内层的实现细节,越靠近圆心,软件越抽象,封装的越完全。最里面的圆就是核心的业务规则。

跨越边界

在图的右下方有个例子,描述了我们如何跨域圆的边界。它演示了controller和view如何与下一层的用例进行通信。请注意这个控制流程,它从controller开始,穿过用例层,在presenter中执行。还要注意代码依赖性,它们中的每一个都指向用例的内部。

我们通常用依赖倒置 来解决这一明显的矛盾。在像Java这样的语言中,我们将使用接口和继承关系,使得源代码依赖性在边界正常的点上控制反转。

例如,假如用例需要调用presenter。但是不能直接调用,因为这将违反依赖关系规则:内部圆不能涉及外部圆中的任何逻辑。因此,我们在内环中有一个用例调用接口(这里显示为用例输出端口),并让外环中的presenter实现它。

架构中涉及到跨边界操作的都使用这个技术,我们利用动态多态性来维护 代码依赖,这样无论控制流的方向如何,是向内还是向外,我们都可以遵循依赖规则。

哪些数据跨域边界

跨越边界的数据都是简单的数据形式,你可以使用基础的数据结构,简单的数据传输对象,封装成hashmap,或者构建城对象。 重点是,分离独立且简单的数据才可以跨界。不能传输实体或者数据库行,我们不希望这些数据结构有任何违反依赖规则的依赖关系。

例如,很多数据库框架对于查询都返回了一个的数据格式,我们通常叫RowStructure。我们不能将RowStructure传到内部因为这将违反依赖规则,它会需要内部的圆了解外部圆的信息,

因此当我们需要将数据穿过边界时,总是使用最适合内部圆的数据形式。

结论

遵循这些简单的规则并不困难,并且可以减少很大一部分头痛的时间:)。通过将软件分则并且遵循依赖规则,你将创造一个可以测试的,并且有很多其它益处的系统。任何系统的外部模块,像数据库或者web框架,废弃时,你都可以几乎无成本的替换它们。

分布式协议paxos讲解

Posted on 2019-04-02 | Edited on 2019-06-18

CAP

说到分布式,首先要说的是CAP。
CAP,指的是:

  • Consistency 一致性,所有节点之间数据是保持一致的。
  • Availability 可用性,向服务发起的请求可以立即得到非错的响应。
  • Partition tolerance 分区容错性,可以容忍由于分区导致的一些异常问题,比容网络延时、通讯异常、服务宕机等等。

CAP理论是由加州大学伯克利分校的计算机科学家埃里克·布鲁尔提出的猜想,时在一个分布式系统中,这三者无法同时满足。后续这个理论得到了证明从而成为一个定理。

CAP不可兼容

对于大多数系统,P分区容错性时最重要的性质(不然还讲分布式做什么:))因此这套体系关键点在于C一致性和A可用性的矛盾。即在一个拥有大量节点的服务中,如果要保证高可用性,就要有一定的容灾能力,设计上数据存储在多个服务器当中。但在多节点的数据的传输随时可能由于节点宕机、网络延迟等等问题导致数据不可达,那么节点在响应数据请求时,响应结果可能会出现不一致的情况。 如果要保持一致,只能降低可用性(比如在数据不一致时服务不可用等等)。

现在流行的分布式协议,其实都是在CAP中,尤其是CA中找寻一个适当的平衡点,从而有了Base理论:既然无法做到高可用强一致性,那么就选择基本可用最终一致性。

  • BA Basically Available 基本可用性;
  • Soft 软状态;
  • Eventual Consistency 最终一致性。

AP和CP并非绝对

同时在笔者看来,CAP虽是一个定理,但每个分布式协议或者分布式实现,并不是非黑即白的。不是说一个协议,要么说CP,可用性不强,要么是AP,一致性不强。 而是会找寻一些平衡,按照自身的理念去提供最好的服务。

比如,zookeeper是哪种呢?网上大部分会说,zk是CP的, 但笔者看来却并不是。
之所以说zk是cp,是因为zk的writer是全部转发到leader来处理,并在大多数follower认可之后写入成功。因此只要写入了数据就一定是有效且不会丢失的。 但当leader服务宕机之后,需要重新选举leader,才可以继续提供服务,因此zk并不是高可用的。

但在read时,zk为了提高吞吐率,client只要读取follower的数据就ok,那么zk就并非一个强一致的应用。虽然可以通过follower的sync(实时获取leader的最新数据)来处理成读强一致性,但可用性就牺牲太大了。

同时,zk还提供了一种readonly状态机制,即使在zk 进行leader选举时,也可以提供只读服务。那么在一个read远远多于write的场景,zk甚至可以说是满足AP的,read弱一致性,但高可用。

这个世界上没有银弹,对于分布式尤其如此。 固然可能随着技术的更新换代,有些协议或者技术不符合当前场景,但大部分其实是没有优劣之分,只是各有取舍,根据业务的实际场景选择合适的技术方式方案,才是我们开发者所真正需要掌握的技能。

Paxos

paxos是1990年由莱斯利·兰伯特(英语:Leslie Lamport,LaTeX中的「La」)于1990年提出的一种基于消息传递且具有高度容错特性的一致性算法。

Basic Paxos

basic paxos是paxos的最初版本,也是最基础的分布式协议。
在协议中,会有几个角色:

  • proposer 提案者。
  • acceptor 提案处理者,针对提案进行判断,通过则提案成为决议。
  • learner 获取所有被批准的提案(决议),用于返回给请求方。

paxos决议的过程如下(摘自维基百科):

  1. 准备(prepare)阶段:
    1. proposer选择一个提案编号n并将prepare请求发送给acceptors中的一个多数派;
    2. acceptor收到prepare消息后,如果提案的编号大于它已经回复的所有prepare消息(回复消息表示接受accept),则acceptor将自己上次接受的提案回复给proposer,并承诺不再回复小于n的提案;
  2. 批准(accept)阶段:
    1. 当一个proposer收到了多数acceptors对prepare的回复后,就进入批准阶段。它要向回复prepare请求的acceptors发送accept请求,包括编号n和根据P2c决定的value(如果根据P2c没有已经接受的value,那么它可以自由决定value)。
    2. 在不违背自己向其他proposer的承诺的前提下,acceptor收到accept请求后即批准这个请求。

流程图如下:

1
2
3
4
5
6
7
8
9
10
假设之前的最新提案号是N-1
Client Proposer Acceptor Learner
| | | | | | |
X-------->| | | | | | Request 提交数据变更V
| X--------->|->|->| | | Prepare(N) prepare1.1 proposer进行提案。新的提案号为N
| |<---------X--X--X | | Promise(N,{Va,Vb,Vc}) prepare1.2 三台Acceptor发现都是最新的版本,因此做出承诺(不再 回复小于N的提案)
| X--------->|->|->| | | Accept!(N,V) accept1.1 proposer发现多数acceptors回复通过,则提交批准请求
| |<---------X--X--X------>|->| Accepted(N,V) acceptor批准请求,并同步到learners
|<---------------------------------X--X Response learners处理准备的决议(V),并返回给客户端,该流程有6步。
| | | | | | |

流程通过时比较顺利,但在并发环境下,可能会出现资源浪费和活锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
假设当前最新提案号都是N-1
Client Proposer Acceptor Learner
| | | | | | | | |
X ------->| | | | | | | Request(V) Client1 提交数据变更V
| X-------->| | | | | | Request(W) Client2 提交数据变更W
| | X----------->|->|->| | | Prepare(N,V) proposer1进行提案,新的提案号是N
| | |<-----------X--X--X | | Promise(N,V) prepare1.2 三台Acceptor发现都是最新的版本,因此做出承诺(不再回复小于N的提案)
| | | X--------->|->|->| | | Prepare(N,W) proposer2进行提案,此时并不知道p1的提案,因此提案号也是N。
| | | |<---------X--X--X | | Reject(N,W) 根据prepare1.2的协议,acceptors已经回复了proposer1编号为N的提案,针对proposer2,直接否决,并返回最新的提案号为N。
| | | X--------->|->|->| | | Prepare(N+1,W) proposer2发现最新的提案号是N,自己保留的已经失效,因此更新提案号(N+1)重新提案。
| | | |<---------X--X--X | | Promise(N+1,W) Acceptor判断新的提案编号较新,因此做出承诺(不再回复小于N+1的提案)
| | X----------->|->|->| | | Accept!(N,V) 此时proposer1对于proposer2的提交号全局编号的变化并不知情,只知道自身之前提交的prepare已经被acceptors promise了,因此发起批准请求。
| | |<-----------|<-|<-| | | Reject(N,V) accept1.2 根据协议,由于acceptors批准p1的请求会违背对于proposer2的承诺,因此拒绝,并返回最能的编号N+1
| | X----------->|->|->| | | Prepare(N+2,V) proposer1发现最新的提案号时N+1,自己的已经实效,因此更新提案号(N+2)重新提案。
.......................

以此类推,proposer1和proposer2的循环由于prepare阶段和accept阶段的交替进行可能无限下去。之所以是活锁是因为如果某个proposer在prepare之后马上accept,则可以解开这个循环。 不过随着节点和并发请求的增多,即使不会出现活锁,也会有大量的资源消耗。

Multi-Paxos

Basic-Paxos之所以出现活锁和计算浪费,是由于basic-paxos有两个阶段,多个proposer会提交多个提案,这些版本在不同阶段中会出现时序上的冲突。

Multi-Paxos为了解决这个问题,设定了一个leader角色。所有的提案,都由leader来发起。同时,由于只有一个proposer(就是leader)来进行提案,避免了编号紊乱的问题,因此可以跳过prepare阶段,直接进行accept:

1
2
3
4
5
6
7
8
9
10
11
Client     Leader       Acceptor     Learner
X---------->| | | | | | Request(V)
| X-------->| | | | | | Request(W)
| | X--------->|->|->| | | Accept!(N,I+1,V) 进行提案,新的轮次是N,编号I+1
| | |<---------X--X--X------>|->| Accepted(N,I+1,V) 有两个请求到达leader,但leader先处理第一个
|<-----------------------------------X--X Response(V) client1 的请求处理完成,之后在提案第二个。
| | X--------->|->|->| | | Accept!(N,I+2,W)
| | |<---------X--X--X------>|->| Accepted(N,I+2,W)
| |<---------------------------------X--X Response(W) 该流程有4步(注意,这里有两个提交,每个提交时4步)
| | | | | | | |
通过leader对时序的管理,省略了prepare步骤,减少了2步消息的传输,并且不会出现活锁。

其中,leader的产生是由选举算法产生(下文讲述)。每一轮选举,N递增。 在此维度上增加I,(在某轮选举中递增的编号)。

Fast-Paxos

在并发提交时,会产生大量的冲突,因此Multi-Paxos增加了独一无二的leader角色负责提案。而Fast-Paxos的设计是,客户端直接提交请求导acceptors,减少一步leader的转发;只有当出现消息冲突时,再由leader来负责协调。 这种方式,非常适合并发冲突不严重时处理。
fast-paxos有两个分支,一个有leader的协调者机制,一个是无leader角色的自动协调机制。
我们先看有leader的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Client  Leader      Acceptor      Learner
| | | | | | | |
| | | | | | | |
X-------------->|->|->|->| | | Accept!(N,I,W) 直接提案
| |<-------X--X--X--X----->|->| Accepted(N,I,W) 提案确认,成为决议
|<------------------------------X--X Response(W) 可以看到,在没有冲突时,该流程有3步,会比multi-paxos减少一步流程。
| | | | | | | |
| X--------------?|-?|-?|-?| | | Accept!(N,I,V) client1 提案V
X----------------?|-?|-?|-?| | | Accept!(N,I,W) client2 提案w
| | | | | | | | |
| | | | | | | | | !! Acceptors disagree on value
| | |<-------X--X->|->|----->|->| Accepted(N,I,V) client1 的提案V被前两个acceptors批准,并通知leader和learner
| | |<-------|<-|<-X--X----->|->| Accepted(N,I,W) client2 的提案W被后两个acceptors批准,并通知leader和learner
| | | | | | | | | 由于acceptors的意见不一致,并且没有一个获得多数派,因此learner不会做处理。
| | | | | | | | | !! Detect collision & recover
| | X------->|->|->|->| | | Accept!(N+1,I,W) leader发现无法没有一个决议形成多数派,那么根据协调算法,指定W作为提案,重新提交。N+1,用来标识更高的优先级。
| | |<-------X--X--X--X----->|->| Accepted(N+1,I,W) acceptors获取到leader的提案,并且编号时N+1(这时client无论再如何重复提交,也无法影响这个决议,因为cleint的提案只能是(N,I+X,X)),N的优先级更高,I增加到多少都无用。
|<--------------------------------X--X Response(W) learner 发现多数派决议,并返回客户端。 这里可以看到,出现冲突时,流程的复杂度时比multi更高。

上述的fast和multi,都有一个leader角色。但leader从哪里来呢,当leader宕机时该怎么产生新的leader呢?这就是分布式中比较有名的选举算法,而fast-paxos的另一个分支—无leader角色的自动协调机制就是为了解决这个问题。 zk的zab协议,以及etcd的raft协议,都是默认采用fast-paxos的这个分支为基础进行leader选举的。

无leader的自动协调,其实就是协调一种固有的机制,那zk为例子,当出现冲突时:

  • 优先选择事务zxid更大的。 zxid是一个64位的数据,前32位是epoch(选举轮次),后32位是事务Id。服务重启时,各个server的zxid相同,只有服务正在运行中leader宕机时,zxid才会不同。通过这个机制,会选择zxid更大的服务器作为leader(zxid最大,数据也最新)
  • 如果zxid相同,优先选择serverId更大的。 serverId是服务器的唯一id,可以区分大小。

通过这种机制,没有leader,也可以合并client和learner:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Client         Servers
| | | | | |
| | | | | | !! Concurrent conflicting proposals
| | | | | | !! received in different order
| | | | | | !! by the Servers
| X--------?|-?|-?|-?| Accept!(N,I,V) client2 提案V
X-----------?|-?|-?|-?| Accept!(N,I,W) client1 提案W
| | | | | |
| | | | | | !! Servers disagree on value
| | X<>X->|->| Accepted(N,I,V) 前两台servers批准了V,并向其它servers发送提案。
| | |<-|<-X<>X Accepted(N,I,W) 后两台servers批准了W,并想其它servers发送提案。
| | | | | |
| | | | | | !! Detect collision & recover
| | X<>X<>X<>X Accepted(N+1,I,W) servers都收到到其它servers的提案,由于没有形成大多数,因此进行自动协调,由于N和I相同,因此假设根据后两台servers的机器编号更大,修正提案,批准后两台机器的提案W。
|<-----------X--X--X--X Response(W) W提案形成大多数,形成决议,回复W。
| | | | | |

这里只是一个大体的流程,实际上X<>X<>X<>X Accepted(N+1,I,W) 这一步远比流程中复杂的多,如果有机会再进行说明吧。

“互联网医院”和“医院 互联网+”

Posted on 2019-03-10 | Edited on 2019-06-18 | In 随笔

最近每次经过天府大道,都路过一个建筑工地招牌,上面写着:“今天,你互联网+了吗?”。随着目前国内经济的低迷,以及整体互联网行业的不给力,计算机产业都转向瞄准了传统行业,进行行业技术升级,所谓的“供给侧改革”。医疗也不例外,医院的app、小程序、公众号雨后春笋般涌现,确实给患者带来了很多的便利,而这些都是“互联网+”的体现。
不过最近国家又发出来一个政策,“互联网医院”(其实这个政策几年前就有,在银川试点,只不过后来被叫停,去年重启),而这有何“医院 互联网+”有什么区别呢?

Read more »

《流浪地球》电影观后感

Posted on 2019-02-06 | Edited on 2019-06-18 | In 其它

header
和家人观看了最近很火的《流浪地球》,用我爸的话说,没想到中国的科幻电影可以拍的这么好。不得不说,在这个只要舍得投入就能请到国外大牌特效团队的现在,一个好故事好剧本并能加工成电影,作者刘慈欣和导演郭帆太难得可贵。虽然看影评有些吐槽说两个小年轻开始有些烦人,故事有些主旋律,但我觉得这对整个电影起到画龙点睛的作用。

地球需要流浪2500年,100代人。这是什么概念,从2500年前到现在,地球的文明发生了多少变革,谁能知道这流浪的2500年见,在这个资源匮乏、生活条件恶劣的时代,人类的文明又将发生什么走向?是疯狂麦克斯的野蛮暴力,还是十二只猴子中的等级森严,亦或是云图中的回归原始?电影都没有讲述。电影讲述的是流浪中众多危机的一个篇章,也是几个年轻人成长的故事。虽然他们一开始有些脑残,看着着实气人,但地球的未来终归要交到他们手中,而欣慰的是,无论是初期叛逆暴躁的的户口,胆小怕事的朵朵,耍滑头的Mike,以及后面遇到的一一,他们在上一代人的影响下,见证了牺牲,获得了勇气,懂的了责任,以及最重要的-看到了希望。他们也都成长,成熟,能够担当起重任,而这,也就是文明的真正精髓:传承!“贝加尔湖的冰可能要2000年后才能化成水”,“没事,我还有孩子,还有孩子的孩子”。

电影没有像其它灾难电影一样,讲述高官贵富为了自身安全产生无谓的勾心斗角,而是时时刻刻表现着全人类为了文明的延续做出的牺牲和努力。 当地球没有希望逃离木星时,人类做的选择是让领航船抛弃地球,独自奔向新的家园。而当最后的希望出现时,不同人种不同国家的人又可以聚在一起,创造最终的奇迹。 这让我想到了巴别塔的故事,神明担心人类团结起来过于强大,于是让人类只能说不同的语言,不能沟通,而木星,别称朱庇特,正是神话中最强大的神明的名字(虽是不同文化体系的)。而正式电影的第一个镜头(不算前面的前奏),就是刘培强拿起来一个同声翻译器,笑:)。 当真正的天灾出现时,面对太阳的衰亡,面对木星的引力,人类是多么的无能和渺小;但当人类没有了语言的隔阂,团结起来,却又可以克服重重困难,虽是主旋律,但又何妨,这才是科幻电影,应该去向世人表达的。

hystrix使用和问题分析

Posted on 2019-02-02 | Edited on 2019-06-18

笔者公司使用的是k8s在搭建公司的微服务框架,没有使用java系的spring cloud。在rpc框架选择上,为了降低他人的学习成本,没有使用thrift、grpc等rpc框架,而是使用了以http协议为主的“伪rpc框架”feign。
当前还是使用良好,不过由于一些业务问题,单一的http协议在网络调度复杂的微服务框架中,还是会带来一些调度问题。因此最近引入了hystrix,来提升整体系统网络调用上的健壮性。

首先介绍一下Hystrix,因为github主页的原意:
Hystrix是一个延迟和容错库,旨在隔离对远程系统、服务和第三方库的调用,在复杂的分布式系统中,故障是无法避免的,它可以阻止级联故障,并提供整体系统的容错能力。

Hystrix提供如下的能力:

  1. 延迟和容错能力 阻止级联故障,优雅的降级,快速失败快速回复。基于线程和信号量的隔离,并可以熔断。
  2. 实时操作:实时监控并变更配置。 拥有快速的反馈机制 报警->做出决定->修改配置->查看变更结果。
  3. 并发:并行执行,并发获取请求缓存。通过请求折叠自动批处理。

关于Hystrix的使用,github的wiki已经介绍的够详细了,这里列出一些我们使用的要点,和遇到的问题。

Read more »

Druid连接池源码分析

Posted on 2019-01-12 | Edited on 2019-06-18

概要

对于连接池,有三个重要逻辑:获取连接 ,创建连接,维持连接 以及回收连接。
同时由于涉及到占用数据库的连接资源,因此连接池需要严格维护几个数字:

  • maxPoolCount:也可以叫maxActiveCount连接池的最大活跃连接数量。
  • minPoolCount:也可以叫minIdleCount,连接池的最少保持空闲连接数量。

获取连接

druid的连接池,我们先看下几个主要的城边变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// stats 这些都是统计使用,暂时忽略
private volatile long recycleErrorCount = 0L;
。。。。。
// store
// 真正的连接池存储
private volatile DruidConnectionHolder[] connections;
// 当前的连接数量
private int poolingCount = 0;
// 当前活跃的链接数量
private int activeCount = 0;
// 当前废弃的链接数量
private long discardCount = 0;
......
//
private DruidConnectionHolder[] evictConnections;
private DruidConnectionHolder[] keepAliveConnections;

在DruidDataSource的父类DruidAbstractDataSource中,也有几个重要的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
// 初始化容量
protected volatile int initialSize = DEFAULT_INITIAL_SIZE;
// 最大连接数
protected volatile int maxActive = DEFAULT_MAX_ACTIVE_SIZE;
// 最少空闲连接数
protected volatile int minIdle = DEFAULT_MIN_IDLE;
//最大空闲连接数,已经废弃
protected volatile int maxIdle = DEFAULT_MAX_IDLE;
// 最大等待时长
protected volatile long maxWait = DEFAULT_MAX_WAIT;
protected int notFullTimeoutRetryCount = 0;
....
// 全局锁
protected ReentrantLock lock;
protected Condition notEmpty;
protected Condition empty;
// 活跃连接锁
protected ReentrantLock activeConnectionLock = new ReentrantLock();

druid本质上是一个生产消费者模型,因此其中的lock是druid最重要的一个锁,下面的empty是lock的两个消费者condition,notEmpty是生产者condition。该lock可以根据配置生成公平锁或非公平锁,默认是非公平,效率较高。

看代码能发现,除了一些统计数据使用Atomic的cas实现之外,大部分的关键数据和count的变更都是依赖lock实现线程安全的。

现在是关键的getConnection()的实现,参见getConnectionInternal():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
//一些常规检测
....
for (boolean createDirect = false;;) {
//缺省情况下,一个DruidDataSource会使用两个线程分别用于创建连接和销毁或检测连接。
//在分库分表的某些场景,可能需要数百甚至数千个数据库,因此会创建大量的线程。这里暂时只讲解普通连接。
....
}

try {
//全局锁 获取连接
lock.lockInterruptibly();
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}
...
try{
// 增加连接数
connectCount++;
//根据是否有超时时间获取连接
if (maxWait > 0) {
holder = pollLast(nanos);
} else {
holder = takeLast();
}
}finally{
// 释放全局锁
lock.unlock();
}

我们在看takeLast()方法,本质上是一个生产-消费者模型的消费者,获取连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 获取连接,注意,这时线程还持有之前获取的的全局lock,因此不会出现并发问题。
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
// poolingCount就是可用连接数
while (poolingCount == 0) {
//池子里没有,等待连接,唤醒empty
emptySignal(); // send signal to CreateThread create connection

....
try {
// notEmpty等待,释放lock锁,等待唤醒
notEmpty.await(); // signal by recycle or creator
} finally {
notEmptyWaitThreadCount--;
}
notEmptyWaitCount++;
...
}
}...
// decrement poolingCount;
decrementPoolingCount();
// 清除connections中相应的连接。
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;

return last;
}

创建连接

在init时会创建初始的连接,这里暂时不表。
另一个是在获取连接的时候发现连接不够而创建,参见emptySignal()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void emptySignal() {
if (createScheduler == null) {
empty.signal();
return;
}

if (createTaskCount >= maxCreateTaskCount) {
return;
}

if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}

createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask();
this.createSchedulerFuture = createScheduler.submit(task);
}

有个容易混淆的是,在emptySignal()这个方法中,看起来是做了一些判断,然后创建了创建连接的任务交给createScheduler执行,但普通使用情况中,createScheduler=null,所以直接就返回了。

那我们看看普通场景下真正创建连接的producer:CreateConnectionThread,而不是schedule使用的CreateConnectionTask。CreateConnectionThread在连接池初始化时创建,在连接池销毁时销毁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public void run() {
initedLatch.countDown();
long lastDiscardCount = 0;
int errorCount = 0;
//循环检测
for (;;) {
// addLast
try {
// 被唤醒时,全局加锁
lock.lockInterruptibly();
} catch (InterruptedException e2) {
break;
}
producerStartCount++;

long discardCount = DruidDataSource.this.discardCount;
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;

try {
boolean emptyWait = true;

if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}

if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}

if (emptyWait) {
// 必须存在线程等待,才创建连接
if (poolingCount >= notEmptyWaitThreadCount //
&& !(keepAlive && activeCount + poolingCount < minIdle)) {
emptyWaitCount1++;
empty.await();
}

// 防止创建超过maxActive数量的连接
// 这种情况是虽然有现成在等待连接,但连接池已经达到最大连接数,因此await()
if (activeCount + poolingCount >= maxActive) {
emptyWaitCount2++;
empty.await();
continue;
}
}

} catch (InterruptedException e) {
// 一些错误处理
} finally {
// 释放索
lock.unlock();
}
// 如果需要创建物理连接
PhysicalConnectionInfo connection = null;
//创建物理连接
try {
startConnectionCount.incrementAndGet();
connection = createPhysicalConnection();
} catch (SQLException e) {
// 错误处理
} catch (RuntimeException e) {
// 错误处理
} catch (Error e) {
// 错误处理
}

if (connection == null) {
continue;
}

// 存放连接
// 这里需要注意的是,有可能存放失败,说明创建了多于的连接,因此关闭
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}

errorCount = 0; // reset errorCount
}
}

通过上面分析,当消费者发现无连接可用时,会唤醒生成者,而生产者发现最大连接数已满,再次await。这带来一个性能问题:在高并发情况下,如果连接数已满,新发起的连接请求都会经过这一次消费者await,生产者被notify,然后再次await的场景,产生大量的线程切换,这是一笔不菲的消耗。
为什么不在消费者获取连接的时候去检测是否连接已满呢?只有连接数不足的情况下才去唤醒emptySignal()。

维持连接

Druid使用在启动时会创建一个DestoryTask,放在schedule线程池,默认每分钟执行一次shrink方法来维持连接,同时也会将超时的连接废弃掉,这里我们只看shrink,参见代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public void shrink(boolean checkTime, boolean keepAlive) {
//全局锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}

int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}
//这里代码比较多,简单来说就是在所有连接中,找出哪些存活时间太久(maxEvictableIdleTimeMillis)需要释放,哪些需要keepalive。
//同时需要判断灵位一个属性,minIdle,最少连接数。

//这里有个关键步骤,将所有要处理的链接从主池子connections里去掉,后续keepalive将或者的连接加回来。
int removeCount = evictCount + keepAliveCount;
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
poolingCount -= removeCount;
}

} finally {
lock.unlock();
}

// 释放存活时间过长的连接
if (evictCount > 0) {
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCount.incrementAndGet();
}
Arrays.fill(evictConnections, null);
}

if (keepAliveCount > 0) {
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();

boolean validate = false;
try {
// 重新验证连接是否可用。
this.validateConnection(connection);
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}

// 连接可用,加回主连接池,不可能关闭。
if (validate) {
holer.lastActiveTimeMillis = System.currentTimeMillis();
put(holer);
} else {
JdbcUtils.close(connection);
}
}
Arrays.fill(keepAliveConnections, null);
}
}

值得注意的是,Druid为每种数据库实现了不同逻辑的isValidateConnection()方法,通常检测Valid的方法是调用一个简单的sql语句,但很多数据库都提供了不同的高效手段,比如通过mysql的ping方法实现高效检测。

回收连接

在Connection的生命周期中,使用完以此连接,回调用Connection.close()方法。
Druid自定义DruidPooledConnection实现了close方法,不是关闭连接,而是调用DruidDataSource回收。
这里笔者就不赘述了,大家查看DruidDataSource.recyle()方法即可,逻辑比较简单,只是判断条件比较多。

总结

Druid使用多线程中的常见案例生产者-消费者模型来构建整个并发体系,采用ReenTrantLock作为技术基础,高效的实现了数据库连接池。同时,它还有这丰富的统计模型进行数据库使用数据分析(希望后面有机会分析)。
但有些遗憾的是,Druid对于CAS模型使用较少,锁场景比较多,在高并发情况下线程调度还是会消耗不少的时间。看源代码发现,Druid应该有两套开发思路,一种是在核心的资源调度上用ReetTrantLock,而在统计场景中较多的使用了CAS,没有形成很好的统一。

Rocketmq vs Kafak 产品对比

Posted on 2018-12-11 | Edited on 2019-06-18

目前消息中间件产品比较多了,之前使用的都是阿里的rocketmq.最近阿里云新上线了kafka,于是我做了相关调研,是否需要更换。

我们先来看看相同点:

高吞吐量

两个mq都是通过日志的形式实现消息的持久化,同时rocketmq也支持同步刷盘,提高单机可靠性。

高可用

两个mq都支持同步和异步replica。
rocketmq:master-slaver主备同步。
kafka:较为复杂,实质上是通过partition多follower的方式冗余。

高度负载均衡

发送负载均衡,消费负载均衡

消息消费实时性

rocketmq:长轮询
kafka:之前是短轮询,0.8版本之后支持长轮询
ps:为什么都是长轮询,而不是push?因为消息的消费速度不确定,采用长轮询的方式,可以让消费者控制消息的获取速度。如果用push,可能会出现消费者消费速度较慢mq就将新的消息推送过来的场景。

但同时,由于架构不同,两个mq也是有比较大的区别,下面一一讲起:

kafka架构


zookeeper: 整个消息服务可用性的保障,存储消息服务中的所有topic以及partition的leader-follower的关系。
controller: kafka的partition存在leader-follower即主备的关系。初期是靠zk来选举,但后续发现随着partition数量的增加,导致zk的竞争非常激烈。不得已,kafka做了一个升级,加入了controller角色。 当服务启动时,zookeepr只需要竞选出一个broker作为controller,controller来指定partition的leader-follower关系,并将相应的信息存储到zk中。
broker: 普通的消息服务器,内部有多个partition,可能会升级为controller。


在kafka中,一个topic会被拆分成多个partition实现负载均衡,这些parition就被controller分配到多个broker中(多对多关系),每一个broker中的parition,都对于一个日志文件(加入一个broker中有不同topic的5个paritition,则有5个partition文件),这个文件相互隔离,互不干扰。


同时,每个partition会有多个replica,通过controller来指定leader-follower的关系。

producer在发送消息时,根据策略指定partition或者随机或者其它策略选择partition发送。consumer根据数量是一对一消费还是一个consumer消费多个partition(同一topic)情况下。需要主要的是,一个partition同时只能被一个consumer消费,因此如果consumer的数量大于partition的数量,那么是会有consumer收不到消息。

每个partition对应一个write offset和read offset,日志顺序写入,提高消息吞吐率。consumer顺序读消息,通过不同的策略移动read offset(比如获取消息就更新offset,或者commit消费成功之后再移动),即使consumer挂掉,新的也会沿着上次的offset读取消息。

rocketmq架构

再看下rocketmq的基本架构图:


介绍一个这几个角色
name server: rocketmq自己实现的一套服务发现和注册服务,每个nameserver的机器互相不会通信,但每个服务器都会和nameserver通信。
master-slaver:主备服务器,一个master可能对应多个slaver。producer和consumer都通过nameserver找到相应的master工作,当master挂掉之后,consumer只可以从slaver读消息,producer不能再向master写消息,但会通过负载找寻其它可以写入的master。 master和slaver的服务器是初始配置好的,不会由于宕机而动态切换。


rocketmq的producer也是通过各种策略将消息分发到不同的服务器中,但与kafka不同的是,rocketmq的每个服务器上,都只有一个日志文件,叫做commitlog,多个topic的消息会同时写入到一个commitlog中。那么它是怎么做到区分不同消息的呢?这就需要一个新的模块:consume queue。

发送消息时,会在消息中注明是哪个topic。写入日志时,broker中的所有topic顺序写入commitlog,但会基于配置分化出不同的consumer queue,每个queue对应一个read offset,剩下的就和kafka相同了。

如果queue较多,这不还是随机读取了吗?rocketmq 使用pagecache技术,类似于计算机系统的swapcache来解决这个问题,多个queue对应多个cache区间,所以rocketmq的使用时要多占用一些内存的。

对比总结

通过上面的论述就可以发现问题,理论上,kafka的吞吐量峰值会更高,因为在日志刷盘时,kafka是同时写入多个文件,假如服务器有多个硬盘,会有多套IO可以并发刷盘,而rocketmq只有单个。同时,kafka的极限可靠性也比rocketmq强,单说broker,不考虑负载的情况下,即使broker只剩下一台,kafka依然可以保障运行,但rocketmq的master全部挂掉之后producer就无法写入消息来,只能人工介入。

但rocketmq同样有优势,第一就是架构简单,没有重型的zookeeper和选举策略,也没有controller这种控制角色,所有的信息更多的是基于配置的形式完成,虽然没有那么自动,但后期的维护和迭代升级也会更方便,未知的和不确定性的性能瓶颈会更少。
kafka是基于日志系统诞生的产品,关注的是高吞吐量,而rocketmq是阿里在电商和更多复杂业务中孕育的互联网化产品,提供更多的服务功能,比如事务消息、定时消息等等,而且基于它的架构,当topic数量较大时也不会产生什么问题,但kafka就会急剧的性能下降。因为kakfa的每个parition对应一个文件,topic越多,partition越多,每个broker要并发写入的文件也会更多,达到一定阈值,自然负载就下来了。

STM vs CAS

Posted on 2015-01-08 | Edited on 2019-06-18

学习了几天的clojure,接触比较多的一个名词,就是STM(software transactional memory),软件事务内存。

CAS

说stm之前,先说说跟STM比较相似的CAS

在多线程领域,并发控制一直是个大难题。使用互斥锁虽然可以隔离共享变量的访问,但是一个资源同时只能被一个线程占用的时候,并发性能就会急剧下降。在java中,ReadWriteLock读写分离锁,ReentrantLock增加锁的伸缩灵活性,都是种治标不治本的方式。

于是就诞生了基于乐观索概念,通过版本(标识号,version,多种形式)控制数据访问的的无锁并发。java中的CAS就是基于此原理。

STM

软件事务内存也是基于乐观锁的一种实现形式。说到事务,一般想到的就是数据库,而stm的事务内存,跟数据库中mvcc非常像。只不过stm是在内存中实现的而已。

它相比CAS有什么优点:

更灵活。当处理复杂逻辑时,简单对象肯定无法满足需求,java中需要借助于AtomicReference,通过对象的比较实现CAS,那么在编码的时候,就需要知道,哪些对象可能出现并发危险。STM却可以将复杂的函数逻辑放到事务中,不用关心哪些数据会出现冲突,这和数据库的理念类似。

但这里有几个问题,需要在以后的学习中去解答:

  • 因为是基于内存的版本实现,所以在高并发中,内存消耗会很高。数据库可以使用这种方式,因为数据库的并发连接本身就不会多,而且数据库是基于硬盘的存储,消耗高也是可以接受的。而stm如果在服务器中应用,在高并发模式下,会消耗大量的内存来处理事务了。 所以stm更适合cpu密集型的应用,比如Storm?
  • 既然STM这么强大,为什么java没有应用。做不到,不屑于做,还是发现有隐患而没有去做? 首先不可能是做不到,因为clojure也是基于jvm虚拟机的,干儿子都做得到,凭什么java这个亲儿子做不到。那么为什么java没有做呢?

慢慢研究吧。

基于spring websocket开发聊天功能的研究实战

Posted on 2014-12-05 | Edited on 2019-06-18 | In web

上一篇中我简单说了一下我们项目的构想,其中有一个环节是需要开发一个web聊天功能。这个功能是基于spring4.0 新推出的对websocket支持,和SOckJS以及Stomp开发的,这里简单讲解一下。

Read more »

springmvc 自定义异常处理 无法解析spring内部异常的分析

Posted on 2014-11-26 | Edited on 2019-06-18 | In web

一直在使用springmvc来当做后台应用的controller。最近发现一个问题,就是客户端做了一次数据处理,报了500异常,但是再服务端却没有任何异常输出。尽管我做了自定义异常处理:

Read more »
12
zhaohaifeng

zhaohaifeng

14 posts
5 categories
28 tags
© 2019 zhaohaifeng
Powered by Hexo v3.9.0
|
Theme – NexT.Gemini v7.1.0