因此有如下的需求:
(1)整个集群中,只允许一台去跑定时任务
(2)当这台机器宕机后,需要从其他存活的机器中找出一台继续运行
Zookeeper3.5.8的下载及安装请参考:https://blog.csdn.net/qq_44665283/article/details/121038105?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164044318316780357264764%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=164044318316780357264764&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_ecpm_v1~rank_v31_ecpm-2-121038105.nonecase&utm_term=zoo&spm=1018.2226.3001.4450
(1)监听的原理:
2、zookeeper实现master选举org.apache.zookeeper zookeeper 3.4.14
package com.yyds.quartzstudy.zk;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TimedTask {
private final static String path = "/timetask/lock";
private String machineName;
public TimedTask(String machineName) {
this.machineName = machineName;
}
public void go() {
try {
String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
// 只要执行一次countDown(),等待的线程就会继续执行
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {// 异步创建
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println(machineName + " 连接zk成功...");
toBeMaster(zooKeeper,machineName);
}catch (Exception e){
e.printStackTrace();
}
}
private void toBeMaster(ZooKeeper zooKeeper, String machineName) {
zooKeeper.create(
path,
"test".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,// 没有权限控制
CreateMode.EPHEMERAL,// 临时无序节点
new AsyncCallback.StringCallback() { // 异步创建节点
@Override
public void processResult(int i, String s, Object o, String s1) {
// 如果创建成功
if(i == KeeperException.Code.OK.intValue()){
System.out.println(machineName + " 创建临时节点成功,成为master,执行定时任务...");
try {
// 模拟执行定时任务
System.out.println(machineName + " 开始执行定时任务...");
TimeUnit.SECONDS.sleep(3);
// 模拟master宕机
zooKeeper.delete(path,-1);
System.out.println(machineName + " master宕机...");
} catch (Exception e) {
e.printStackTrace();
}
}else if(i == KeeperException.Code.NODEEXISTS.intValue()){
// 没有创建成功,就去 监听 lock节点
System.out.println(machineName + " 发现master已经存在,开始监听lock节点...");
try {
zooKeeper.exists(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 如果监听到该节点被删除,就会尝试成为master
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
System.out.println(machineName + "尝试成为master...");
toBeMaster(zooKeeper, machineName);
}
}
});
} catch (Exception e) {
e.printStackTrace();
}
}else {
System.out.println(machineName + " toBeMater状态异常:" + i);
}
}
},
"ctx"
);
}
}
3、测试
package com.yyds.quartzstudy.zk;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class ZkTest {
public static void main(String[] args) {
IntStream.rangeClosed(1,5)
.mapToObj(index -> "机器" + index)
.map(TimedTask::new)
.map(timedTask -> (Runnable)() -> timedTask.go())
.map(Thread::new)
.forEach(Thread::start);
try {
TimeUnit.SECONDS.sleep(16);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、测试结果:
机器4 连接zk成功...
机器5 连接zk成功...
机器2 连接zk成功...
机器3 连接zk成功...
机器1 连接zk成功...
机器5 创建临时节点成功,成为master,执行定时任务...
机器5 开始执行定时任务...
机器1 发现master已经存在,开始监听lock节点...
机器2 发现master已经存在,开始监听lock节点...
机器4 发现master已经存在,开始监听lock节点...
机器3 发现master已经存在,开始监听lock节点...
机器5 master宕机...
机器3尝试成为master...
机器1尝试成为master...
机器2尝试成为master...
机器4尝试成为master...
机器1 创建临时节点成功,成为master,执行定时任务...
机器4 发现master已经存在,开始监听lock节点...
机器2 发现master已经存在,开始监听lock节点...
机器3 发现master已经存在,开始监听lock节点...
机器1 开始执行定时任务...
机器2尝试成为master...
机器3尝试成为master...
机器4尝试成为master...
机器1 master宕机...
机器2 创建临时节点成功,成为master,执行定时任务...
机器2 开始执行定时任务...
机器4 发现master已经存在,开始监听lock节点...
机器3 发现master已经存在,开始监听lock节点...



