在分布式计算中,Leader选举是指指定单个进程作为分布在多台计算机(节点)之间的某些任务的组织者的过程。在任务开始之前,所有网络节点都不知道哪个节点将充当任务的Leader。然而,在执行Leader选举算法之后,整个网络中的每个节点都会选举出一个特定的、唯一的节点作为任务Leader。
LeaderLatchCurator框架提供了两种Leader选举的实现,LeaderLatch和LeaderSelector,本篇博客介绍LeaderLatch的使用。
测试代码CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorframeworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.31.175:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}
LeaderLatchRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):
package com.kaven.zookeeper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class LeaderLatchRunnable implements Runnable{
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
@SneakyThrows
@Override
public void run() {
// 使用不同的Curatorframework实例,表示不同的分布式服务节点
Curatorframework curator = getCuratorframework();
curator.start();
assert curator.getState().equals(CuratorframeworkState.STARTED);
// 模拟随机加入的分布式服务节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 创建LeaderLatch实例(用于Leader选举)
// curator是Curatorframework实例,用于与ZooKeeper交互
// "/services/leader"是latchPath,Leader节点会成功创建该节点(其他节点则会失败)
// 将线程名(Thread.currentThread().getName())作为分布式服务节点的id
// LeaderLatch.CloseMode.NOTIFY_LEADER表示close模式,即节点进行close操作后的模式
LeaderLatch latch = new LeaderLatch(curator, "/services/leader",
Thread.currentThread().getName(), LeaderLatch.CloseMode.NOTIFY_LEADER);
// 给LeaderLatch实例添加监听器(LeaderLatchListenerImpl实例)
// EXECUTOR_SERVICE表示执行该LeaderLatchListenerImpl实例的Executor实例
latch.addListener(new LeaderLatchListenerImpl(latch), EXECUTOR_SERVICE);
System.out.println(latch.getId() + "准备好了!");
// 开始Leader选举
latch.start();
System.out.println(latch.getId() + "开始Leader选举!");
}
private Curatorframework getCuratorframework() {
// 创建Curatorframework实例
return CuratorframeworkFactory.builder()
.connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorframeworkProperties.NAMESPACE)
.build();
}
@RequiredArgsConstructor
private static class LeaderLatchListenerImpl implements LeaderLatchListener {
private final LeaderLatch LATCH;
// 被选举为Leader节点时调用
@SneakyThrows
@Override
public void isLeader() {
System.out.println("--------------------------------" + LATCH.getId() + "被选举为Leader--------------------------------");
LATCH.getParticipants().forEach(System.out::println);
// 睡眠5秒就close(该节点会从Leader选举中移除),其他节点会重新进行Leader选举
Thread.sleep(5000);
LATCH.close();
}
@Override
public void notLeader() {
// 节点调用了close方法,只有在LeaderLatch.CloseMode.NOTIFY_LEADER模式下会调用该方法
// LeaderLatch.CloseMode.SILENT模式下不会调用该方法
System.out.println("--------------------------------" + LATCH.getId() + "离开,重新进行Leader选举--------------------------------");
}
}
}
启动类;
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
for (int i = 0; i < 7; i++) {
EXECUTOR_SERVICE.execute(new LeaderLatchRunnable());
}
Thread.sleep(10000000);
}
}
模拟7个分布式服务节点进行Leader选举,输出如下所示:
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-1离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-4离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-3离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-5离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-6离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-7离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
--------------------------------pool-1-thread-2离开,重新进行Leader选举--------------------------------
CloseMode枚举类:
public enum CloseMode
{
SILENT,
NOTIFY_LEADER
}
修改成LeaderLatch.CloseMode.SILENT模式(默认模式,因此也可以不设置closeMode属性):
LeaderLatch latch = new LeaderLatch(curator, "/services/leader",
Thread.currentThread().getName(), LeaderLatch.CloseMode.SILENT);
输出变成如下所示:
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
很显然监听器实例的notLeader方法没有被调用。
public synchronized void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
cancelStartTask();
try
{
setNode(null);
client.removeWatchers();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
finally
{
client.getConnectionStateListenable().removeListener(listener);
switch ( closeMode )
{
case NOTIFY_LEADER:
{
// 先设置hasLeadership属性(设置完后会调用监听器),再删除监听器
setLeadership(false);
listeners.clear();
break;
}
default:
{
// 先删除监听器,再设置hasLeadership属性(设置完后会调用监听器,但已经没有监听器了)
listeners.clear();
setLeadership(false);
break;
}
}
}
}
为什么LeaderLatchRunnable实例的run方法执行结束了,还能继续进行Leader选举,因为执行了latch.start();,有新线程被启动:
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
// 有新线程被启动
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
LeaderLatch类的public方法如下图所示:
都是一些比较常规的方法,这里只介绍两个await方法,其他方法比较简单。
- await():调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断或关闭。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下三种情况之一:
- 此实例成为Leader
- 其他线程中断当前线程
- 实例已关闭
- await(long timeout, TimeUnit unit):调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断、指定的等待时间已过或实例已关闭。如果指定的等待时间已过或实例已关闭,则返回false ,如果等待时间小于或等于0,则该方法根本不会等待。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下四种情况之一:
- 此实例成为Leader
- 其他线程中断当前线程
- 指定的等待时间已过
- 实例已关闭
通过await方法,使用LeaderLatch进行Leader选举就像使用CountDownLatch一样方便,Curator框架的Leader选举实现LeaderLatch就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



