栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

[zk] CuratorFramework的LeaderLatch选主解析

[zk] CuratorFramework的LeaderLatch选主解析

[zk] Curatorframework的LeaderLatch选主解析

三个要点:分布式锁、顺序选主、监听方式

主要竞选流程

1、参与竞选的LeaderLatch实例在zookeeper上的{latchpath}目录创建临时有序子节点,如{latchpath}/latch-{seq},值默认为"".getBytes()。

2、创建有序节点成功后,回调函数,读取节点{latchpath}目录的所有子节点信息,对子节点的名称按{seq}从小到大排序,判断如果当前创建节点的值为最小的节点值,则断定当前的节点为master节点,回调执行isLeader的listener。否则当前节点未竞选master成功,成为一个follower,注册对前一个{seq}节点watcher事件,在watcher到NO_NODE事件后尝试重新竞选master。

1、LeaderLatch#start 启动竞选

public void start() throws Exception
    {
        //检测当前的LeaderLatch实例的状态为未开始选举
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        //startTask为Feature对象,记录返回信息,AfterConnectionEstablished创建一个单线程池,提交任务(待zk的client连接建立后执行如下的 runnable 代码)
        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    //执行具体的选举逻辑
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
}

2、LeaderLatch#internalStart

    private synchronized void internalStart()
    {
        //二次确认当前转态是否为started
        if ( state.get() == State.STARTED )
        {
            //此时连接已经建立,这里添加一个zk的连接状态监听器,用于执行连接重连(重新参与竞选,reset)、阻塞(重置leader为false)、丢失(重置leader为false)等事件的逻辑
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                //竞选相关的复用核心逻辑
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }

3、LeaderLatch#reset可复用核心竞选逻辑

    void reset() throws Exception
    {
        //参与竞选,首先设置默认的为非leader
        setLeadership(false);
        //当前的有序路径latch锁为null,这里也会删除旧的有序路径latch锁
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(Curatorframework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
       //尝试在{}
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }

4、LeaderLatch#getChildren 处理latch目录节点信息,包含识别主以及注册watch事件

        private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(Curatorframework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    //获取数据成功,识别master,注册watch事件
                    checkLeadership(event.getChildren());
                }
            }
        };
        //获取latch目标子节点数据信息,并触发callback
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

5、LeaderLatch#checkLeadership 识别master,注册watch事件

    private void checkLeadership(List children) throws Exception
    {
        if ( debugCheckLeaderShipLatch != null )
        {
            debugCheckLeaderShipLatch.await();
        }
        
        //获取当前node在latch目录注册的latch-seq信息,之前在创建节点的时候有个回调 setNode(event.getName())设置进去的
        final String localOurPath = ourPath.get();
        //将节点按 seq 进行升序排序
        List sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        //获取当前node的seq所在的位置
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            //发现当前node写入的临时latch-seq节点没有,重新执行竞选逻辑
            reset();
        }
        else if ( ourIndex == 0 )
        {
            //seq最小,认定竞选master节点成功
            setLeadership(true);
        }
        else
        {   
            //否则当前node是作为master的follower,且需要随时监听master节点变动已及时竞选master
            String watchPath = sortedChildren.get(ourIndex - 1);
            //监听前一个节点的变动个,然后触发竞选逻辑。只监听前一个节点变动的好处为watcher事件更少,由于竞选的逻辑是seq最小的先当前master,所以只要前一个seq存在当前节点则不需要参与进行master逻辑
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(Curatorframework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            //添加watcher事件
                      client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/326759.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号