三个要点:分布式锁、顺序选主、监听方式
主要竞选流程
1、参与竞选的LeaderLatch实例在zookeeper上的{latchpath}目录创建临时有序子节点,如{latchpath}/latch-{seq},值默认为"".getBytes()。
2、创建有序节点成功后,回调函数,读取节点{latchpath}目录的所有子节点信息,对子节点的名称按{seq}从小到大排序,判断如果当前创建节点的值为最小的节点值,则断定当前的节点为master节点,回调执行isLeader的listener。否则当前节点未竞选master成功,成为一个follower,注册对前一个{seq}节点watcher事件,在watcher到NO_NODE事件后尝试重新竞选master。
1、LeaderLatch#start 启动竞选
public void start() throws Exception
{
//检测当前的LeaderLatch实例的状态为未开始选举
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
//startTask为Feature对象,记录返回信息,AfterConnectionEstablished创建一个单线程池,提交任务(待zk的client连接建立后执行如下的 runnable 代码)
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
//执行具体的选举逻辑
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
2、LeaderLatch#internalStart
private synchronized void internalStart()
{
//二次确认当前转态是否为started
if ( state.get() == State.STARTED )
{
//此时连接已经建立,这里添加一个zk的连接状态监听器,用于执行连接重连(重新参与竞选,reset)、阻塞(重置leader为false)、丢失(重置leader为false)等事件的逻辑
client.getConnectionStateListenable().addListener(listener);
try
{
//竞选相关的复用核心逻辑
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("An error occurred checking resetting leadership.", e);
}
}
}
3、LeaderLatch#reset可复用核心竞选逻辑
void reset() throws Exception
{
//参与竞选,首先设置默认的为非leader
setLeadership(false);
//当前的有序路径latch锁为null,这里也会删除旧的有序路径latch锁
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(Curatorframework client, CuratorEvent event) throws Exception
{
if ( debugResetWaitLatch != null )
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
setNode(event.getName());
if ( state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error("getChildren() failed. rc = " + event.getResultCode());
}
}
};
//尝试在{}
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
4、LeaderLatch#getChildren 处理latch目录节点信息,包含识别主以及注册watch事件
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(Curatorframework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
//获取数据成功,识别master,注册watch事件
checkLeadership(event.getChildren());
}
}
};
//获取latch目标子节点数据信息,并触发callback
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
5、LeaderLatch#checkLeadership 识别master,注册watch事件
private void checkLeadership(Listchildren) throws Exception { if ( debugCheckLeaderShipLatch != null ) { debugCheckLeaderShipLatch.await(); } //获取当前node在latch目录注册的latch-seq信息,之前在创建节点的时候有个回调 setNode(event.getName())设置进去的 final String localOurPath = ourPath.get(); //将节点按 seq 进行升序排序 List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); //获取当前node的seq所在的位置 int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if ( ourIndex < 0 ) { log.error("Can't find our node. Resetting. Index: " + ourIndex); //发现当前node写入的临时latch-seq节点没有,重新执行竞选逻辑 reset(); } else if ( ourIndex == 0 ) { //seq最小,认定竞选master节点成功 setLeadership(true); } else { //否则当前node是作为master的follower,且需要随时监听master节点变动已及时竞选master String watchPath = sortedChildren.get(ourIndex - 1); //监听前一个节点的变动个,然后触发竞选逻辑。只监听前一个节点变动的好处为watcher事件更少,由于竞选的逻辑是seq最小的先当前master,所以只要前一个seq存在当前节点则不需要参与进行master逻辑 Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ) { try { getChildren(); } catch ( Exception ex ) { ThreadUtils.checkInterrupted(ex); log.error("An error occurred checking the leadership.", ex); } } } }; BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(Curatorframework client, CuratorEvent event) throws Exception { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { // previous node is gone - reset reset(); } } }; // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak //添加watcher事件 client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } }


![[zk] CuratorFramework的LeaderLatch选主解析 [zk] CuratorFramework的LeaderLatch选主解析](http://www.mshxw.com/aiimages/31/326759.png)
