JGroups在集群环境下还是有一定的用处的,比如基于JGroups & memcached开发分布式cache层,集群任务的切分和计算结果的合并,JBossCache组件…
XML-协议栈 – 以一个JGroups标配的tcp.xml为例子
1.读取XML文件<config>下<ELEMENT>集合,每一个<ELEMENT>初始化成一个ProtocolConfiguration
– ProtocolConfiguration -> protocol_name | MAP<key, value>
这样组成一个LIST<ProtocolConfiguration>
——
2. ProtocolConfiguration参数处理
a. 迭代MAP每一个key-value处理value,如果value中能找到正则式${([^\}]+)}中的group(1)则抽取出来处理
b. 抽取出所有的group(1),如果满足正则([^:]+):([^:]+)则将其group(1)和group(2)分别抽取出来,否则整个
当成一个group(1)
c. 如果group(1)满足[([^,]+),]*([^,]+)则抽取所有的组迭代这些值调用System.getProperty(value)如果不为空
退出迭代,如果在这个过程中根据group(1)得到的值为空则使用group(2)来代替,这层取得的值如果不为
空则其上一层的group(1)使用这个值代替,否则将此key-value从ProtocolConfiguration属性中删除
——
3.构建协议栈ProtocolStack
a. LIST<ProtocolConfiguration>到LIST<Protocol>的处理
迭代ProtocolConfiguration根据ProtocolConfiguration.protocol_name初始化对应的Protocol类,再根据
ProtocolConfiguration.properties对Protocol的属性进行设置,其具体步骤是首先获取此Protocol过时的
属性从properties里面移除,再获取此Protocol里面可以设置的属性如果properties里面有则进行设置,
最后如果properties还有多余的映射则会抛异常。初始化一个LIST<Protocol>生成的Protocol加到此LIST中
b. LIST<Protocol>验证
第一步验证LIST<Protocol>是否有重复,再就是根据每层协议需要上层和下层提供的服务到其上面的协议
和其下面的协议去找是否提供了它需要的服务
迭代LIST<Protocol>进行将Protocol链表连接起来
———–
– Protocol protocol : LIST<Protocol>
– protocol.upProtocol=nextProtocol
– nextProtocol.downProtocol=protocol
———–
c. 协议栈ProtocolStack本身也是一个Protocol,其维护了两个引用top_prot和bottom_prot,其中top_prot为
STREAMING_STATE_TRANSFER,bottom_prot为TCP,并且top_prot的upProtocol设置为ProtocolStack。
再将ProtocolStack倒序过来依次调用Protocol.init来初始化协议。
更多详情见请继续阅读下一页的精彩内容: http://www.linuxidc.com/Linux/2013-10/91114p2.htm
JGroups 的详细介绍:请点这里
JGroups 的下载地址:请点这里
相关阅读:
JGroups入门教程 http://www.linuxidc.com/Linux/2013-10/91114.htm
TP属性…
——————————-
Address local_addr
1.在协议层启动时,TP层首先做的就是获取local_addr,如果目前local_addr为null则触发事件
Event.GET_LOCAL_ADDRESS给上层等待上层告诉TP层local_addr,再获取此TP和组内机器进行交互的能突破NAT
的物理地址,例如TCP物理地址是由TP属性external_addr, bind_addr, bind_port, port_range联合起来参考组成的,
其逻辑是:如果external_addr不为空则IP是external_addr,否则看bind_addr,如果bind_addr不为空则IP是
bind_addr,否则再使用InetAddress.getLocalHost()做IP;再迭代bind_port到bind_port+port_range如果哪个端口
没有被用则用做PORT。将local_addr->IP+PORT的映射加到logical_addr_cache中去。
2.在DOWN逻辑里面处理MSG事件时既需要发送数据时如果loopback为TRUE时表明走TP需要发送的数据如果是
组内多播或者是发送给自己的则不走网络传输了直接UP这个MSG的备份就可以了,走网络传输的话TCP是接受不
到的,这时候需要判断是不是发送给自己的就是用MSG的目标地址和local_addr来做比较的
3.在发送数据时需要设置MSG的源地址,设置为local_addr
4.在接受到数据后,如果loopback为TRUE且数据是多播且源地址是local_addr则不会UP这个MSG事件
5.在协议启动时如果local_addr为空会触发Event.GET_LOCAL_ADDRESS给上层来获取其值,同理在DOWN逻辑
里面会对Event.SET_LOCAL_ADDRESS事件感兴趣,当监听到这个事件时会设置新的local_addr,同时将新的
local_addr->IP+PORT的映射加到logical_addr_cache中去。
——————————-
Set<Address> members
这是一份维护组内成员地址的数据结构,主要是在处理DOWN逻辑里面当事件Event.TMP_VIEW或者
Event.VIEW_CHANGE进来时会清空当前的members,将新的成员地址列表加入members中,同时会到
logical_addr_cache中标记被移除的成员地址的映射为可删除
——————————-
ReentrantLock connectLock
互斥锁主要用来同步连接过程和断开连接的过程
1. 在处理DOWN逻辑里面当事件Event.CONNECT,Event.CONNECT_WITH_STATE_TRANSFER,
Event.CONNECT_USE_FLUSH, Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH进入时以TCP为例会启动
Accepter Thread和Mapper就可以监听组内的连接请求和发送数据到组内机器了,而这个过程需要互斥锁来保护
2.在处理DOWN逻辑里面当事件Event.DISCONNECT进入时以TCP为例会需要关闭Accepter Thread和Mapper释
放连接资源和内存HARD-REFERENCE,而这个过程同样需要互斥锁来保护
——————————-
Executor oob_thread_pool
ThreadFactory oob_thread_factory
BlockingQueue<Runnable> oob_thread_pool_queue
这组线程池配置主要用来处理本地的数据上传,这里会有两种情况
1.本地自己发的多播或者接受者为自己的数据,而且本地希望接受这些数据时&数据是OOB类型数据时会往
oob_thread_pool里面添加一个UP这个MSG的逻辑任务
2.TCP的连接在接受到数据后,如果MSG是OOB类型的会往oob_thread_pool里面添加一个UP这个MSG的逻辑
任务
3.TP层协议在初始化时会启动这个线程池,在销毁时会关闭这个线程池
——————————-
Executor thread_pool
ThreadFactory default_thread_factory
BlockingQueue<Runnable> thread_pool_queue
这组线程池配置主要用来处理本地的数据上传,这里会有两种情况
1.本地自己发的多播或者接受者为自己的数据,而且本地希望接受这些数据时&数据不是OOB类型数据时会
往thread_pool里面添加一个UP这个MSG的逻辑任务
2.TCP的连接在接受到数据后,如果MSG不是OOB类型的会往thread_pool里面添加一个UP这个MSG的逻辑任务
3.TP层协议在初始化时会启动这个线程池,在销毁时会关闭这个线程池
——————————-
TimeScheduler timer
ThreadFactory timer_thread_factory
TimeScheduler实际上就是一个线程池时间任务调度器,和普通的区别在于普通的周期性任务加进来以后执行
的周期是固定的,而它的任务支持任务自己动态的改变自己的执行周期。timer不只在TP层使用,TP层的上层也
可获取这个调度器的引用来调度任务,在TP层的使用主要有
1.执行logical_addr_cache里面失效映射的删除工作
2.bundler需要发送数据时,将发送数据的任务加入
3.who_has_cache这个具有生命周期的SET在ADD元素时都会往timer里面添加一个定时删除的任务
——————————-
Future<?> logical_addr_cache_reaper
在TP层初始化的时候会往timer里面添加一个周期性的时间任务,这个任务的逻辑是删除logical_addr_cache里面
标记删除的ADDRESS->IP+PORT的映射,而添加这个任务返回的Future就是logical_addr_cache_reaper方便协议
销毁时取消这个任务的执行,这个任务添加进来时指定延时和周期都是logical_addr_cache_expiration
——————————-
SocketFactory socket_factory
——————————-
Bundler bundler
这个属性比较重要,用来缓存需要发送的数据,以多个维度来触发发送缓存的数据,Bundler是一个接口,主要
定义了三个行为,当TP层需要发送网络数据时都会把数据提交给bundler然后就返回了,bundler需要发送数据时
则将请求给TCP层做网络发送
—
void start()
void stop()
void send(Message msg)
—
其实现有两种DefaultBundler和TransferQueueBundler介绍下这两个的实现
———-
DefaultBundler 这个以两个维度来保证数据的发送
-Map<SingletonAddress,List<Message>> msgs 用于缓存发给某一个地址MSG链表
-long count 累计目前bundler缓存的数据大小,超过某限制时做发送
-int num_bundling_tasks 目前在timer里面正在执行发送任务的个数
运行逻辑
1.外界加一个Message进来时会看目前count+MSG.length是否大于了限制,如果大于则触发TCP层的发送
否则将MSG加到msgs的映射中,同时累计count
2.外界加一个Message的同时会看num_bundling_tasks是否小于2,如果是则会往timer里面添加一个发送
Msgs里面数据的任务,当这个任务执行完后num_bundling_tasks–
———-
TransferQueueBundler
– BlockingQueue<Message> buffer send进来的MSG都先放在此阻塞队列中
– Map<SingletonAddress,List<Message>> msgs 同上
– Thread bundler_thread 业务线程
– long count 同上
– long max_bundle_timeout 业务线程获取MSG阻塞时间
运行逻辑
1.外界加一个Message进来时往buffer里面放,如果空间已经慢了会造成阻塞
2.Start函数会启动bundler_thread业务线程,其运行逻辑是
– a.如果目前buffer大小为空则永久阻塞读取值,否则休眠max_bundle_timeout获取值
– b.醒来时如果获取不到MSG则回到a,否则判断目前是否可以发送数据了,其判断依据是count
超过限制或者buffer空间用来超过90%或者线程在a过程休眠的时间超过max_bundle_timeout
如果需要发送数据则调用TCP网络层发送msgs缓存的数据,否则只是将取出的MSG加到msgs中
———-
——————————-
DiagnosticsHandler diag_handler
List<ProbeHandler> preregistered_probe_handlers
——————————-
ConcurrentMap<String,Protocol> up_prots
这是一个channel-name到虚拟协议层的映射,虚拟协议层ProtocolAdapter的作用主要是,比如几个JChannel
希望使用一个TP来传输数据,结构图如下
这样在TP接受到数据后必须要到up_prots上根据逻辑地址去找这个消息是属于哪一个协议链表的,然后UP给
这个协议链表,同时如果TP层需要触发什么事件,如果其上面有多个协议链表则需要一个个UP
——————————-
TpHeader header
TpHeader里面只包含一个参数channl-name,由TP层发出去的数据都会加这样一个HEADER,这个值的初始化
是在接受到如下事件时做的: Event.CONNECT,Event.CONNECT_WITH_STATE_TRANSFER,
Event.CONNECT_USE_FLUSH, Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH
——————————-
LazyRemovalCache<Address,PhysicalAddress> logical_addr_cache
AgeOutCache<Address> who_has_cache
这个一个迟删除的CACHE,其存储的数据主要是由逻辑地址到物理地址的映射,当TP层需要发送一段数据时
需要使用物理地址建立连接发送数据,先到logical_addr_cache里面找,如果找到了发送数据, 否则UP一个
事件Event.GET_PHYSICAL_ADDRESS上层,然后数据就不管了,数据没有发送,who_has_cache在这个逻辑
里面其的主要作用是业务逻辑可能连续的发送某一地址数据,而上层处理Event.GET_PHYSICAL_ADDRESS需
要时间,则会导致上层的重复计算,所以在发起一个计算前先看who_has_cache里面有没有,如果有则不UP
这个事件
———————————————————————————————————-
TP事件…
Event.GET_LOCAL_ADDRESS
在整个JGROUPS事件体系中Event.GET_LOCAL_ADDRESS只有TP层和JChannel处理了这个事件,JChannel有
属性local_addr,在JChannel启动协议层之前调用UUID.randomUUID随机生成了一段字符串表示这个
JChannel的local_addr。[在调用启动协议层后在TP层的启动逻辑里面如果TP.local_addr是空则会触发一个
Event.GET_LOCAL_ADDRESS事件,然后这个事件一直UP到JChannel层,JChannel的UP逻辑里面遇到这个
事件时返回其local_addr]这个事件发生的概率很低其原因是JChannel生成其local_addr和启动协议栈直接会
触发一个Event.SET_LOCAL_ADDRESS事件参数就是其local_addr,在tcp.xml配置的协议栈里面对这个事件
感兴趣的协议有:TCP, TCPPING, MERGE2, FD_SOCK, FD, VERIFY_SUSPECT, NAKACK, UNICAST, STABLE,
VIEW_SYNC, GMS, STREAMING_STATE_TRANSFER, 在这些协议栈中都持有一个local_addr的属性,在DOWN
这个事件时设置自己的local_addr属性。
—————-
Event.SET_LOCAL_ADDRESS
在上面有提到,TP层会接受到来自于JChannel的Event.SET_LOCAL_ADDRESS,然后设置自己的local_addr
—————-
Event.CONFIG
在协议栈初始化的过程中,由于TP层是最新初始化的,在初始化的过程中,如果参数bind_addr不为空,表示
TP做网络需要绑定这个地址,会封装一个MAP(bind_addr->$bind_addr)作为Event.CONFIG的参数UP协议层,
而对这个事件感兴趣的协议有
FD_SOCK -> 有属性bind_addr,接受Event.CONFIG读取MAP.bind_addr赋给bind_addr
VERIFY_SUSPECT -> 同上
STREAMING_STATE_TRANSFER -> 同上
—————-
Event.GET_PHYSICAL_ADDRESS
在TP层有两处逻辑和Event.GET_PHYSICAL_ADDRESS事件有关,一个是UP一个的DOWN
1.当TP需要向组内一台机器发送数据时,根据其逻辑地址到logical_addr_cache中没有找到物理地址则会UP这个
事件,参数是需要连接机器的逻辑地址
2.TP在处理DOWN逻辑时在处理这个事件时从logical_addr_cache中取物理地址返回
对这个事件感兴趣的协议层主要有
TCPPING -> 主要有两点,一个是需要发送数据的时候触发一个DOWN+逻辑地址到TP层来取逻辑地址的物理
地址,另一点是在接受到来自于TP的Event.GET_PHYSICAL_ADDRES事件请求时会去查找这个逻辑地址的物理
地址,并返回NULL,TCPPING协议是Event.GET_PHYSICAL_ADDRESS在UP方向的终点,其UP方向的起点是TP
层,其在DOWN方向的终点是TP层
GMS -> 需要知道一个local_addr物理地址时触发一个DOWN+逻辑地址到TP层来取物理地址
JChannel –> 同上
—————-
Event.SET_PHYSICAL_ADDRESS
这个事件和Event.GET_PHYSICAL_ADDRESS一一对应的,TP层在发送数据时不知道一个逻辑地址的物理地址
时会UP一个Event.GET_PHYSICAL_ADDRES事件,TCPPING在接受到这个事件后会去找这个逻辑地址的物理
地址,在找到后会DOWN事件Event.SET_PHYSICAL_ADDRESS,TP层在DOWN逻辑里面对这个事件的处理是
将逻辑地址和物理地址的映射放到logical_addr_cache中,对这个事件感兴趣的协议层有
TCPPING –> TCPPING在接受到来自于TP层的地址请求后会做网络操作,在处理MSG事件里面如果MSG数据
是地址信息就会将地址封装到Event.SET_PHYSICAL_ADDRESS里面DOWN给其下面的TP层
—————-
Event.GET_LOGICAL_PHYSICAL_MAPPINGS
TP层在DOWN逻辑里面接受到这个事件后将MAP结构logical_addr_cache内容返回,而发起这个事件的只有
TCPPING层协议
—————-
Event.TMP_VIEW
Event.VIEW_CHANGE
在TP层有维护一个属性members用于保持组内所有机器的地址,在DOWN逻辑里面接受到这两个事件时表示
其上层负责维护组内机器信息的协议发现组内成员发生了变化,则使用新的组内成员的信息更新members,
同时由于和组内的每台机器都会建立连接,则对不再出现的组内的机器断掉和它的连接。对这个事件感兴趣
的协议层有
TCPPING –> 其和TP层一样维护了组内机器地址信息,在接受到这两个信息时设置members
NAKACK –> 其也是维护了组内机器的地址信息,还做了其它事,等分析到这一层…
GMS -> 这一层是事件Event.TMP_VIEW,Event.VIEW_CHANGE的发祥地,当协议层侦测到组内成员变化时会
UP&DOWN这两个事件
STREAMING_STATE_TRANSFER –> 其维护了组内机器的地址,当侦测到这两个事件时,更新members
—————-
Event.CONNECT
Event.CONNECT_WITH_STATE_TRANSFER
Event.CONNECT_USE_FLUSH
Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH
TP层在接受到这些事件是表示JChannel调用了connect的连接请求,TP层开始启动连接有关的属性,开放
ServerSocket端口和MAPPER,对这个事件感兴趣的协议栈有
JChannel –> JChannel在调用connect系列函数时触发这些事件,根据CONNECT的需求不同来触发事件,
细节等分析到JChannel再谈,JChannel在建立连接时会指定两种参数flushable&stateable这样就有了4种情况,
JChannel在DOWN这类事件时到GMS层时事件继续DOWN下去,当时会返回NULL给JChannel完成connect函数。
GMS –> GMS在接受到这类事件后会返回NULL给JChannel并且继续DOWN这个事件,同时开始这一层的操作
UNICAST -> 将属性disconnected标记为false,但是只处理Event.CONNECT事件
FD_SOCK -> 在接受到这个事件时会在FD_SOCK层开放一个SERVERSOCK,同时FD_SOCK对Event.CONFIG
事件感兴趣,TP设置的bind_addr用于建立SERVERSOCK,如果没有只好用默认的了,这个SERVERSOCK的端口
如果FD_SOCK配置了start_port则从这个端口开始扫描一直到没有使用的端口,start_port默认是0
TCPPING –> 只是将事件参数cluster_name设置到自己的属性group_addr
—————-
Event.DISCONNECT
TP层在接受到这些事件是表示JChannel调用了disconnect的连接请求,TP层做的事就是关闭用于传输数据的
SERVERSOCK和SOCKET集合,对这个事件感兴趣的协议层有
JChannel –> JChannel在调用disconnect函数后触发这个事件DOWN下去
GMS -> GMS在接受到这类事件后会返回NULL给JChannel并且继续DOWN这个事件,同时开始这一层的操作
UNICAST -> 将属性disconnected标记为true
FD_SOCK -> 关闭在连接建立时触发建立的SERVERSOCKET
—————-
Event.REMOVE_ADDRESS
TP层在接受到这个事件后到logical_addr_cache删除事件参数指定的逻辑地址对应的映射,这个事件只有
JChannel会触发,当JChannel调用init做初始化时或者做set-local_addr时发现其localhost不为空则删掉在
logical_addr_cache里面的映射,触发下事件Event.REMOVE_ADDRESS给TP层
—————-
Event.MSG
MSG事件是TP层的主要工作,TP层就是用来发送数据用的,其发送逻辑是
1.设置MSG的源地址,源地址是用TP.local_addr来表示的既逻辑地址,如果TP上面不是虚拟的协议层则直接
使用local_addr,否则不处理是因为MSG在DOWN到虚拟层时已经设置成各个虚拟层的local_addr
2.如果MSG的目的地址是物理地址则表示这个MSG来自于TCPPING层用于发送信息给初始化的成员列表,
不会使用NAKACK来做传输失败的重传所以将目标地址设置为空
3.如果TP层loopback=true表示来自多播或者传输给自己的MSG上层会接受,默认是TRUE,再看目标地址
是不是多播地址或者目标地址是不是自己,如果需要将数据拷贝一份给自己则做一个拷贝,将MSG封装成
MSG事件UP上去,这个过程是交给线程池去做的,如果是OOB则给OOB线程池,否则给默认线程池,
在处理发送的时候有个逻辑就是有虚拟协议层和没有虚拟协议层的区别处理,如果是有虚拟协议层的会去
TPHEADER里面那虚拟协议层的名称再到up_prots里面拿到虚拟协议层的应用UP上去
4.在3中如果目的地址不是多播地址则处理MSG事件结束,否则还需要将MSG传输给其他机器
5.发送信息,如果使用bundler则只将MSG丢给bundler否则自己做发送操作,发送操作的逻辑是:如果地址
是多播地址则迭代logical_addr_cache拿出所有的物理地址发送消息,否则从logical_addr_cache中根据逻辑
地址去物理地址,如果不存在处理逻辑上面提到过,发送细节在TCP层有提到
—————-
TP.Socket接受数据
TP层会和组内所有的机器都建立连接,这些连接在接受到数据后会给TP,然后将数据封装成MSG,然后将
处理逻辑根据MSG是不是OOB分别提交给不同的线程池,其处理逻辑是只有一个就是屏蔽自己发送给自己
的网络消息
—————-
TP – init & start
1.init主要做的事是启动几个时间任务调度timer, oob_thread_pool, thread_pool,实例化who_was_cache,
logical_addr_cache,如果bind_addr设置了会触发Event.CONFIG
2.start主要做的事是如果使用bundler则初始化,否则来消息了直接发送这样会造成发送时的阻塞,另外如果
local_addr为空会触发Event.GET_LOCAL_ADDRESS给JChannel来获取
1. TCP层参数
reaper_interval 如果需要轮询核查连接是否已经失效则会在MAP层启动一个轮询检查线程,调用每一个连接
的是否失效的验证函数,如果已经失效则会关闭此连接,而这个值的设置有两点影响:
1. 如果设置了则会启动一个轮询线程
2. 轮询线程的轮询间隔时间
——
conn_expire_time 作为判断连接是否已经失效的依据,逻辑为如果此值大于0且当前时间和此连接上次做IO的
时间间隔大于此值则任务连接已经失效
——
bind_addr 用于启动ServerSocket时,如果不为null则启动只监听地址为bind_addr的网卡,否则全部监听
——
external_addr NAT映射外部地址,用于建立连接初写入流的数据,组内机器直接使用这个地址来何其建立
连接,否则可能会因为NAT的阻碍连接建立不了…
——
bind_port 用于启动ServerSocket时指定绑定的端口,在区间[bind_port, bind_port+ port_range]里面迭代寻找
还没被占用的端口
——
port_range 指定端口区间
——
recv_buf_size 接受数据的缓存大小
——
send_queue_size 在发送数据时使用阻塞队列的大小
——
use_send_queues 在发送数据时是否使用阻塞队列
——
send_buf_size 发送数据的缓存大小
——
sock_conn_timeout 创建连接时限制时间
——
tcp_nodelay True – 保证数据包尽快的被发送不管包多大
——
linger (True – time) – 上层调用close时会阻塞time保证余留数据包SEND & ACK
———————–
2.TCP层启动流程
TCP层主要是管理了一个TCPConnectionMap,将上面提到的参数用来初始化一个TCPConnectionMap,当需要发送
数据时将数据交给TCPConnectionMap发送,当TCPConnectionMap接收到数据时回调TCP层的接收数据的函数
————
TCPConnectionMap层数据结构主要如下
– Mapper有三个数据结构
a. MAP<Address, TCPConnection> 维护地址到连接的映射
b. Reaper轮询检查连接是否过期,如果过期关闭连接,并从映射中消除
c. Vector<ConnectionMapListener>维护连接建立和关闭的监控者
– ServerSocket – 暴露给组内机器用于连接这台机器的终端
– Receiver – 当机器接受到数据时回调给这个数据接受器
– Accepter Thread – 用于处理接受外部连接请求的线程
————
TCPConnectionMap.TCPConnection层表示一个连接
– Socket用于接受和发送数据的套嵌字
– Last_access上一次发送和接受数据时间点
– Sender如果TCP希望通过阻塞队列来存放需要发送的数据则会起一个专门用来发送数据的线程SENDER
– ConnectionPeerReceiver专门用来接受数据的线程
————
初始化方式有两种,根据地址的初始化使用在到MAP中取连接没有时,会根据地址主动的去建立连接;根据SOCKET的
初始化使用在TCPConnectionMap的Accepter Thread在接受到连接请求时,需要将这个连接管理起来,属于被动式
a.指定对方地址建立连接
1. 指定连接使用的地址bind_addr,和对方建立连接
2. 使用TCP层和SOCKET有关的参数来设置此SOCKET的属性
3. 将自己的能突破NAT的地址写入到OUTPUT流中
b.给定一个已经建立连接的SOCKET
1. 使用TCP层和SOCKET有关的参数来设置此SOCKET的属性
2. 读取对方的地址,因为上一种方式的CONNECTION在建立之后会往数据流里写入自己的地址+端口
由于有主动也有被动可能会造成两台机器之间保持两个连接,故会在被动那种方式初始化的TCPConnection做一个判断,
看是否需要关闭这个被动建立的连接,判断依据是如果之前这台机器已经建立了连接而且对方的地址比本地NAT地址小
只有当前机器需要发送数据时才会到MAP中去取连接
————
TCPConnectionMap.TCPConnection.Sender主要负责发送一个连接需要发送的数据
– BlockingQueue<byte[]> send_queue指定大小的阻塞队列,超过大小是提交数据线程阻塞
– Thread runner发送数据线程死循环从send_queue中读取数据发送出去
————
TCPConnectionMap.TCPConnection.ConnectionPeerReceiver轮询读取对方发送过来的数据,回调Receiver
– Thread recv读取数据线程,逻辑为死循环,先读取数据长度,在读取完数据,再回调Receiver,
指定数据和对方的地址
TCPPING层属性
boolean return_entire_cache=false;
int max_rank=0;
int rank=0;
boolean is_server=false;
int num_initial_srv_members=0;
long timeout=3000;
int num_ping_requests=2;
int num_initial_members=2;
boolean break_on_coord_rsp=true;
int max_found_members=0;
TimeScheduler timer=null;
View view;
Vector<Address> members=new Vector<Address>(11);
Address local_addr=null;
String group_addr=null;
Set<Responses> ping_responses=new HashSet<Responses>();
PingSenderTask sender=new PingSenderTask();
List<IpAddress> initial_hosts=null;
BoundedList<PhysicalAddress> dynamic_hosts=new BoundedList<PhysicalAddress>(max);
——————————-
boolean return_entire_cache
int max_rank
int rank
这三个值和PING请求数据过来后本地怎么响应有关系,return_entire_cache表示请求过来后是不是需要将本地TP层
的缓存logical_addr_cache发送给请求端,再判断是否发送缓存还有另一个条件就是请求里面有一个参数
return_view_only,这个参数为TRUE说明请求端只需要本地的local_addr->physical_addr,即条件
return_entire_cache&!return_view_only满足时才会发送本地的logical_addr_cache给请求端,在处理前面那段逻辑前
面会有一个判断,即max_rank和rank起作用的地方,比如一个GROUP里面有5个成员,每一个成员根据其加入的先
后顺序会制定其一个INDEX即rank,然后设置一个SIZE居中的值为max_rank,由于请求会发送给全部的成员,只有
rank<=max_rank的成员才会对PING请求做处理
这三个值的设置是在TCPPING层处理事件Event.TMP_VIEW和Event.VIEW_CHANGE时设置的,当接受到这两个事件
时,本地的local_addr到VIEW.member里面找rank,同时根据VIEW.size来设置max_rank和return_entire_cache,
其逻辑是,如果VIEW.size<=10则max_rank=0&return_entire_cache=false,表示SIZE<=10时所有接受到的PING请
求都要处理,当时只返回本地的local_addr->physical_addr;如果VIEW.size<=20则max_rank=SIZE/5
&return_entire_cache=true;其他同VIEW.size<=20只是max_rank最大只能为10
上面介绍的关于return_entire_cache的逻辑来自于TCPPING父类Discovery,在处理事件Event.TMP_VIEW和
Event.VIEW_CHANGE时在TCPPING里面会默认将return_entire_cache=true
——————————-
boolean is_server=false;
int num_initial_srv_members=0;
这个属性表示本地是不是服务端的身份,本地是不是服务器由GMS层的逻辑设置,在发送PING响应时会告诉请求
端本地是不是服务端身份,原因在于请求端在发送请求后会阻塞等待需要的数据全到,而退出阻塞的逻辑里面有
一条是接受到来自服务端身份的响应超过num_initial_srv_members时就可以退出了
——————————-
long timeout=3000;
int num_ping_requests=2;
本地可能会需要组内机器local_addr->physical_addr的映射,有两个请求来源,一个是来自于本地接受到来自于TP
层UP的Event.GET_PHYSICAL_ADDRESS这点在TP层提到过,另一个是来自于上层DOWN的事件Event.FIND_INITIAL_MBRS
和Event.FIND_ALL_MBRS,在接受到这两个事件时表示上层需要TCPPING层去发送PING请求以获取组内机器的地址,
第一种情况跟这两个参数无关,跟第二种有关系,上层在触发这两事件时需要发网络数据,为了控制网络不要过于
频繁,在延时调度一个发送请求任务成功后返回一个FUTURE这样在下次在需要发送请求时看上一个FUTURE是否已
经被执行完了,如果没执行完则放弃这个任务的执行,而timeout/num_ping_requests就是用来指定这个延时的
——————————-
TimeScheduler timer=null; 这个时间任务调度来自于TP层
——————————-
int num_initial_members=2;
boolean break_on_coord_rsp=true;
这两个参数的作用和num_initial_srv_members一样,同是发送PING请求后推出阻塞的条件
num_initial_members表示在发送请求后如果接受到的响应超过这个值时阻塞就可以退出了
break_on_coord_rsp表示在发送请求后如果接受到身份是整个组的协调者发来的响应阻塞就可以退出了
——————————-
int max_found_members=0;
这个属性是用来指定PING请求发出后阻塞会有一个期待的响应条数,当达到这个数目才会退出阻塞,发送请求会有
两种情况分别对应事件Event.FIND_INITIAL_MBRS和Event.FIND_ALL_MBRS,前者条数为num_initial_members后者
是在num_initial_members,max_found_members,VIEW.SIZE里面取最大值
——————————-
View view;
Vector<Address> members=new Vector<Address>(11);
Address local_addr=null;
String group_addr=null;
这四个属性很熟悉了,view表示组内成员的视图,members表示组内成员列表,当事件Event.TMP_VIEW和
Event.VIEW_CHANGE时会更改这两个值,同时在响应PING请求时会将view发送过去
Local_addr表示本地的逻辑地址,在JChannel层DOWN下来的事件Event.SET_LOCAL_ADDRESS里设置
group_addr表示组的名称,在JChannel层DOWN下来的CONNECT系列事件里设置
——————————-
PingSenderTask sender=new PingSenderTask();
这个类用来发送PING请求缓冲用,在TCPPING接受到来自于GMS的事件Event.FIND_INITIAL_MBRS和Event.FIND_ALL_MBRS
会执行发送PING请求的逻辑,为了控制网络数据量,这些发送逻辑都走sender来同步,其作用就是每一个发送任务
都是延时任务,当来一个新任务时如果上一个任务还没执行或者还没执行完则会放弃
——————————-
Set<Responses> ping_responses=new HashSet<Responses>();
类Responses是一个发送PING请求->阻塞->脱离阻塞的同步器,每一次发送PING请求都会实例化一个Responses类
,然后调用sender发送数据,再进入Responses类的阻塞逻辑,其脱离阻塞在于来PING响应以后需要通知这些
Responses而怎么通知就需要一个数据结构来保持所有的Responses,在PING响应来后迭代ping_responses一一通知,
至于能不能脱离阻塞看各自接受到的数据和退出条件能否满足
——————————-
List<IpAddress> initial_hosts=null;
这个属性是TCPPING层乃至整个tcp.xml整个协议栈能运行起来的命门所在,在上面提到的本地可能会需要组内机器
local_addr->physical_addr的映射的两个请求来源都需要将PING请求发送给组内机器,最开始发送给谁从而让组形
成,initial_hosts起的作用就是最开始组还形成时所有的PING请求都会发送给这些机器
——————————-
BoundedList<PhysicalAddress> dynamic_hosts=new BoundedList<PhysicalAddress>(max);
由于组会动态扩展,新加入的机器链表就是dynamic_hosts,这是一个限制长度的链表,其数据来源主要是TCPPING层
在接受到PING请求时,对方会把自己的local_addr&physical_addr发送过来,表示它是组的新机器,如果initial_hosts
里面没有则加到dynamic_hosts,每次发送PING请求时并合initial_hosts,dynamic_hosts一台台发送PING请求
———————————————————————————————————————
TCPPING层事件
Event.MSG
处理来自于TP层的MSG时会取MSG.header(TCPPING.ID),如果取到HEADER表示这个数据是TCPPING层交互的信息
否则继续UP这个MSG,在PINGHEADER里面有四个有用的数据字段
– byte type – 只有GET_MBRS_REQ和GET_MBRS_RSP两种,前者表示PING请求后者表示PING响应
– PingData – TCPPING层交换的原始数据[组内机器UUID –发送者View – 发送者.isServer – 组内机器local_name – 组内机器physical_addr]
– String cluster_name – JChannel.cluster_name,对于PING请求有意义而对于PING响应没有意义
– Boolean return_view_only – 是否只是需要,对于PING请求有意义而对于PING响应没有意义
MSG分为两种GET_MBRS_REQ和GET_MBRS_RSP,由PINGHEADER.type决定
1.GET_MBRS_REQ
首先判断是否需要处理这个PING请求,其处理逻辑是如果max_rank>0再和rank比较,如果rank>max_rank则表示
本地机器由于加入过迟不需要处理这个PING请求,交给[0,max_rank]段的机器去处理
另外为了保险起见取出PINGHEADER.cluster_name和本地group_addr比较,如果不是表示不是来自于同一个组的
信息不处理
来自于机器组装的PING请求数据由以下信息组成[本地local_addr, null, false, 本地逻辑名, 本地physical_addrs],在处理
这个信息时读取里面的local_addr和physical_addr,如果都不为空会DOWN一个Event.SET_PHYSICAL_ADDRESS事件
给TP层加到地址CACHE里面,同时UUID->逻辑名的映射放到本地UUID的CACHE里面,再判断local_addr是否在初始化
地址或者动态地址里面,如果不在则加入到动态地址里面,表示新进来一个成员
最后走发送响应的逻辑,根据本地的return_entire_cache和PINGHEADER.return_view_only来决定需要发送那些数据
,如果return_entire_cache&!PINGHEADER.return_view_only则从TP层将地址CACHE全部取出来,迭代这些数据组装
成PINGHEADER发送出去,其组装成一个响应PINGHEADER发送出去,数据组织形式是[UUID, 本地.view, 本地.isServer,
UUID.逻辑名, 物理地址],如果条件不成立则只发送本地的地址其数据组织形式是[local_addr, 本地.view, 本地.isServer,
local_name, 本地.physical_addr],PINGDATA封装完后,再使用PINGDATA封装成PINGHEADR[TCPPING.ID, PINGDATA],
再经TP层发送数据。
2.GET_MBRS_RSP
此处逻辑的处理和GET_MBRS_REQ对应,其处理的数据就是GET_MBRS_REQ发送的数据,首先从PINGDATA里面
读取UUID和physical_addr,以及UUID.local_name,再DOWN一个Event.SET_PHYSICAL_ADDRESS事件给TP层加到
地址CACHE里面,同时UUID->逻辑名的映射放到本地UUID的CACHE里面
另一个逻辑和ping_responses有关,因为GMS/MERGE2 DOWN下来的事件都会生成一个Responses来同步DOWN线程
和TP层接受数据的OOB线程池里面的线程,来自于OOB的PINGDATA都满足DOWN线程需要的数据后唤醒DOWN线程,
而这个唤醒的同步则是Responses来做的,OOB线程只需要将接受到的PINGDATA迭代ping_responses放到每一个Responses里面,
Responses根据目前接受到的PINGDATA来决定OOB线程是否唤醒DOWN线程,DOWN线程在被唤醒后再根据PINGDATA
来决定其需要的数据都有了,如果还不足则继续睡。
——————————-
Event.GET_PHYSICAL_ADDRESS
3.来自于TP层的Event.GET_PHYSICAL_ADDRESS,当TP要在网络IO时发现某组内机器没有物理地址则会UO此事件,
TCPPING在接受到此时间后发送PING请求,其组装数据格式为[本地local_addr, null, false, 本地逻辑名,
本地physical_addrs],在PINGHEADER.return_view_only为false表示接受到此请求的机器将其CACHE都发送,
迭代初始化地址和动态地址一一发送此请求,接着TCPPING在接受到GET_MBRS_RSP或者GET_MBRS_REQ都会
DOWNEvent.SET_PHYSICAL_ADDRESS事件给TP层,这样TP层就得到想要的数据了
——————————-
Event.TMP_VIEW
Event.VIEW_CHANGE
在TCPPING层维护了view和members两个属性,在接受到这两事件时重置这两个属性,同时这种本地rank,表示
其在组内的INDEX,再就是根据VIEW.SIZE设置max_rank和return_entire_cache,具体上面有提到,处理完后DOWN
这个事件给TP层处理
——————————-
Event.SET_LOCAL_ADDRESS
在TCPPING持有属性local_addr,在接受到这个事件时设置本地local_addr,再DOWN给TP层处理
——————————-
Event.CONNECT
Event.CONNECT_WITH_STATE_TRANSFER
Event.CONNECT_USE_FLUSH
Event.CONNECT_WITH_STATE_TRANSFER_USE_FLUSH
TCPPING层在接受到连接事件时设置本地group_addr属性,其值为cluster_name,其作用是在处理PING响应时读取
PINGHEADER里面的cluster_name和本地是不是一样,是一样才处理,处理完后DOWN给TP层处理
——————————-
Event.DISCONNECT
DOWN给TP层处理
——————————-
Event.BECOME_SERVER
此事件由GMS层DOWN下来,TCPPING层在接受到此事件时将is_server=true表明本地是服务器身份了,对这个事件
感兴趣的协议有
NAKACK – 其处理逻辑同TCPPING层,同样有个is_server属性
——————————-
Event.FIND_INITIAL_MBRS
此事件由GMS层触发,需要TCPPING层给所有组内机器发送PING请求数据,在接受到num_initial_members个数据,
在发送给每一个组内机器的PINGHEADER里面指定return_views_only=false即组内机器将其CACHE全部发回
——————————-
Event.FIND_ALL_MBRS
此事件由MERGE2层触发,需要TCPPING层给所有组内机器发送PING请求数据,在接受到MAX[num_initial_members,
max_found_members, view.size]个数据,在发送给每一个组内机器的PINGHEADER里面指定return_views_only=true
即组内机器将其自己地址发回就可以了
发送逻辑,其操作和Event.FIND_INITIAL_MBRS一样,不同的地方在于初始化的同步器Responses不一样
Event.FIND_INITIAL_MBRS[num_initial_members, break_on_coord_rsp, false]
Event.FIND_ALL_MBRS[MAX, false, true]
参数依次是:出现多少条PING-RSP,得到协调者响应是否退出,处理者是否只返回其本地地址而不返回CACHE
1.根据上述条件+num_initial_srv_members封装一个同步器Responses,将这个同步器Responses加入到ping_responses里面
2.将发送请求任务提交给任务缓存器sender
3.限时timeout阻塞等待组内机器响应,退出阻塞后将Responses从ping_responses里移除
4.Responses同步逻辑
其里面有一个链表用于保持所以PING-RSP数据,当条数超过其期待的时阻塞退出,当接到到的身份为SERVER的数据条数
超过num_initial_srv_members时阻塞退出,当接收到来自协调者的响应时时阻塞退出,同时超时时阻塞退出,返回PING-RSP链表
对于所以进来的PING-RSP也要做一个过滤,如果某UUID之前没有进来过则加入到链表中,唤醒阻塞,如果之前进来过再看是不是
改UUID发送的自己的UUID->物理地址,如果是唤醒阻塞,否则看该UUID是否是协调者,如果是且其之前不是协调者唤醒阻塞
———————————————————————————————————————
TCPPING层逻辑
1.各个层之间交互自己层的信息,信息从最上层DOWN到最下层TCP发送出去,在发送时在MSG里面添加一个HEADER-ID为自己层
的HEADER然后经TCP发送到指定机器后数据UP到对应的层,发送出去的数据MSG的格式为MSG<LIST<HEADER>, BUFFER>,每层
在处理TP层UP上来的MSG时判断是自己这层需要处理了逻辑是从LIST<HEADER>找HEADER-ID是属于自己ID的HEADER如果找到了
则处理,否则继续UP这个MSG。
2.TCPPING层兼任的任务是寻找组内的机器列表,会有三个地方驱动其获取信息,来自于GMS层DOWN下来的Event.FIND_INITIAL_MBRS,
以及MERGE2层DOWN下来的Event.FIND_ALL_MBRS,以及TP层UP上来的Event.GET_PHYSICAL_ADDRESS,当TCPPING层接受到这三
个事件时会发送PING请求数据包给其保持的组内机器,最开始发送的是XML配置的初始化地址,随着本地接受到来自其他机器的PING
请求从而扩大其下次发送PING请求的范围
BARRIER层属性
long max_close_time=60000;
final Lock lock=new ReentrantLock();
final AtomicBoolean barrier_closed=new AtomicBoolean(false);
Condition barrier_opened=lock.newCondition();
Condition no_msgs_pending=lock.newCondition();
ConcurrentMap<Thread,Object> in_flight_threads=Util.createConcurrentMap();
Future<?> barrier_opener_future=null;
TimeScheduler timer;
final Object NULL=new Object();
——————
BARRIER层介绍
BARRIER层主要用来同步由TP层接受的MSG往上UP的线程的同步器,和concurrent包里面的同步器CyclicBarrier功能一样,即一个可重
复使用的开关。不过BARRIER这个协议层同步器控制的粒度比CyclicBarrier细,CyclicBarrier只是在任务逻辑里某一行里充当开关的角
色,一当任务逻辑通过这行语句后面的逻辑正常运行,通不过则阻塞在这条语句处,而BARRIER是同步整个任务流程,是一个实现很
不错的任务整体逻辑控制同步器,其同步的任务逻辑主要是TP层的线程池里面做MSG-UP的线程,然后开关的打开由
STREAMING_STATE_TRANSFER来控制,其控制方式是DOWN事件到BARRIER层,当BARRIER接收事件Event.CLOSE_BARRIER时将
barrier_closed置为FALSE表示开关关闭,接收到事件Event.OPEN_BARRIER时将barrier_closed置为TRUE表示开关打开,这样同步器
协议BARRIER主要有三段逻辑。
——————
UP-MSG
底层TP层UP的MSG事件,需要通过BARRIER层的同步继续UP给消化此MSG的协议层,
为了全程同步UP线程的任务逻辑,将映射current_thread->NULL添加到in_flight_threads好在BARRIER层全程跟踪这个线程,如果目
前barrier_closed为FALSE表示目前开关是开的则调用up_prot.up(EVENT)将事件提交给上层,然后将映射current_thread->NULL从
in_flight_threads移除;如果目前barrier_closed为TRUE表示开关已经关闭了则竞争lock成功后在barrier_opened上等待开关被打开执
行up_prot.up(EVENT)…
可以想象几种情况下in_flight_threads记录的做UP任务的线程的运行状态
1.开关一直是开着的,这所有的线程进来都畅通无阻,运行状态都是RUNNING
2.开关在某一个时间点执行关闭,在关闭前有几个线程已经通过了判断barrier_closed值的语句继续执行up_prot.up(EVENT)逻辑,这
样in_flight_threads保持的里面就有RUNNING和OBJECT_WAIT这样两种线程,前者是在执行开关关闭时刻程序已经通过了开关点,而
后者没有通过
BARRIER是整个UP逻辑的同步,所以它需要处理在第二种情况下的那些状态为RUNNING的线程,BARRIER采取的策略是等待这些
线程执行完,只阻塞那些没能通过barrier_closed值判断点的线程,这和CyclicBarrier是一样的,区别在于其关闭函数不是只设置
barrier_closed值这么简单,其会等待所有状态为RUNNING的线程执行完,而其判断的依据就是in_flight_threads里面记录的线程
都是OBJECT_WAIT的,其考虑的不周之处是在执行up_prot.up(EVENT)里面也可能有阻塞
由于在逻辑块CLOSE_BARRIER里面为了等待in_flight_threads里面RUNNING的线程执行完是做的TIME_WAIT_OBJECT操作,为了避免
所有的RUNNING的线程都执行完了而执行CLOSE_BARRIER的线程还阻塞则,则在UP逻辑里面添加了一段逻辑,在做完UP后,如果
从in_flight_threads还有自己(说明其是RUNNING的)且目前开关关闭其in_flight_threads目前SIZE==0(说明其是最后一个执行完
的RUNNING线程)会唤醒在做CLOSE_BARRIER的线程
——————
CLOSE_BARRIER
这个过程不仅仅只是设置开关为关闭,其做的另一件事是等待设置开关关闭点以前已经通过了开关是否关闭判断的线程继续执行完,
即这个函数返回时表明目前在BARRIER层执行逻辑的线程都是WAIT_OBJECT状态的,么有还在走UP逻辑的了,说绝对了,如果一个
线程已经逃过了开关点而在走UP逻辑时有WAIT_OBJECT操作,则这个函数不会等待这个线程执行完。
这个逻辑实现的基础便是in_flight_threads,这个数据结构记录了所有的进入BARRIER做MSG-UP逻辑的线程,就可以拿到每个线程
的运行状态,在执行设置表示开关关闭语句后拿取每个线程的状态,如果是RUNNING表明其在设置开关时已经逃过去了则走死循环
判断目前in_flight_threads里面没有这种线程了。
在执行完后会往TP层的TIMER里面添加一个延时时间任务,这个时间任务执行的逻辑是执行
OPEN_BARRIER,具体延时多长时间由属性max_close_time说了算,对于返回的FUTURE赋予barrier_opener_future
——————
OPEN_BARRIER
执行barrier_opened.signalAll()将BARRIER层保持的所有在WAIT_OBJECT在barrier_opened的线程唤醒继续逻辑调用up_prot.up(EVENT)
将事件提交给上层,然后将映射current_thread->NULL从in_flight_threads移除
如果目前barrier_opener_future不为NULL说明CLOSE_BARRIER已经提交了一个执行OPEN_BARRIER的任务,而已经执行了OPEN_BARRIER
则告诉TIMER取消这个任务