栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper : Curator框架之Leader选举LeaderLatch

ZooKeeper : Curator框架之Leader选举LeaderLatch

Leader选举

在分布式计算中,Leader选举是指指定单个进程作为分布在多台计算机(节点)之间的某些任务的组织者的过程。在任务开始之前,所有网络节点都不知道哪个节点将充当任务的Leader。然而,在执行Leader选举算法之后,整个网络中的每个节点都会选举出一个特定的、唯一的节点作为任务Leader。

LeaderLatch

Curator框架提供了两种Leader选举的实现,LeaderLatch和LeaderSelector,本篇博客介绍LeaderLatch的使用。

测试代码

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.31.175:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}

LeaderLatchRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):

package com.kaven.zookeeper;

import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class LeaderLatchRunnable implements Runnable{
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式服务节点
        Curatorframework curator = getCuratorframework();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);

        // 模拟随机加入的分布式服务节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 创建LeaderLatch实例(用于Leader选举)
        // curator是Curatorframework实例,用于与ZooKeeper交互
        // "/services/leader"是latchPath,Leader节点会成功创建该节点(其他节点则会失败)
        // 将线程名(Thread.currentThread().getName())作为分布式服务节点的id
        // LeaderLatch.CloseMode.NOTIFY_LEADER表示close模式,即节点进行close操作后的模式
        LeaderLatch latch = new LeaderLatch(curator, "/services/leader",
                Thread.currentThread().getName(), LeaderLatch.CloseMode.NOTIFY_LEADER);
        // 给LeaderLatch实例添加监听器(LeaderLatchListenerImpl实例)
        // EXECUTOR_SERVICE表示执行该LeaderLatchListenerImpl实例的Executor实例
        latch.addListener(new LeaderLatchListenerImpl(latch), EXECUTOR_SERVICE);
        System.out.println(latch.getId() + "准备好了!");
        // 开始Leader选举
        latch.start();
        System.out.println(latch.getId() + "开始Leader选举!");
    }

    private Curatorframework getCuratorframework() {
        // 创建Curatorframework实例
        return CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
    }

    @RequiredArgsConstructor
    private static class LeaderLatchListenerImpl implements LeaderLatchListener {
        private final LeaderLatch LATCH;

        // 被选举为Leader节点时调用
        @SneakyThrows
        @Override
        public void isLeader() {
            System.out.println("--------------------------------" + LATCH.getId() + "被选举为Leader--------------------------------");
            LATCH.getParticipants().forEach(System.out::println);
            // 睡眠5秒就close(该节点会从Leader选举中移除),其他节点会重新进行Leader选举
            Thread.sleep(5000);
            LATCH.close();
        }

        @Override
        public void notLeader() {
            // 节点调用了close方法,只有在LeaderLatch.CloseMode.NOTIFY_LEADER模式下会调用该方法
            // LeaderLatch.CloseMode.SILENT模式下不会调用该方法
            System.out.println("--------------------------------" + LATCH.getId() + "离开,重新进行Leader选举--------------------------------");
        }
    }
}

启动类;

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 7; i++) {
            EXECUTOR_SERVICE.execute(new LeaderLatchRunnable());
        }
        Thread.sleep(10000000);
    }
}

模拟7个分布式服务节点进行Leader选举,输出如下所示:

pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-1离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-4离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-3离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-5离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-6离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-7离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
--------------------------------pool-1-thread-2离开,重新进行Leader选举--------------------------------

CloseMode枚举类:

    
    public enum CloseMode
    {
        
        SILENT,

        
        NOTIFY_LEADER
    }

修改成LeaderLatch.CloseMode.SILENT模式(默认模式,因此也可以不设置closeMode属性):

LeaderLatch latch = new LeaderLatch(curator, "/services/leader",
                Thread.currentThread().getName(), LeaderLatch.CloseMode.SILENT);

输出变成如下所示:

pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}

很显然监听器实例的notLeader方法没有被调用。

    
    public synchronized void close(CloseMode closeMode) throws IOException
    {
        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");

        cancelStartTask();

        try
        {
            setNode(null);
            client.removeWatchers();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            throw new IOException(e);
        }
        finally
        {
            client.getConnectionStateListenable().removeListener(listener);

            switch ( closeMode )
            {
            case NOTIFY_LEADER:
            {  
                // 先设置hasLeadership属性(设置完后会调用监听器),再删除监听器
                setLeadership(false);
                listeners.clear();
                break;
            }

            default:
            {
                // 先删除监听器,再设置hasLeadership属性(设置完后会调用监听器,但已经没有监听器了)
                listeners.clear();
                setLeadership(false);
                break;
            }
            }
        }
    }

为什么LeaderLatchRunnable实例的run方法执行结束了,还能继续进行Leader选举,因为执行了latch.start();,有新线程被启动:

    
    public void start() throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        // 有新线程被启动
        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

LeaderLatch类的public方法如下图所示:

都是一些比较常规的方法,这里只介绍两个await方法,其他方法比较简单。

  • await():调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断或关闭。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下三种情况之一:
    • 此实例成为Leader
    • 其他线程中断当前线程
    • 实例已关闭
  • await(long timeout, TimeUnit unit):调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断、指定的等待时间已过或实例已关闭。如果指定的等待时间已过或实例已关闭,则返回false ,如果等待时间小于或等于0,则该方法根本不会等待。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下四种情况之一:
    • 此实例成为Leader
    • 其他线程中断当前线程
    • 指定的等待时间已过
    • 实例已关闭

通过await方法,使用LeaderLatch进行Leader选举就像使用CountDownLatch一样方便,Curator框架的Leader选举实现LeaderLatch就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698417.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号