选举机制bin
conf
zoo.cfg配置文件
myzkData
myid身份标志
…等等
出现以下情况会进入leader选举
服务器初始化启动服务器运行期间无法和Leader保持连接
每个zookeeper服务器都具有的东西
1)服务器初始化启动
选票都给myid最大的那个,选出了leader后,后面进来的zookeeper只能成为follower
2)服务器运行期间无法和Leader保持连接
当服务器运行期间无法与Leader保持连接后,无论是当前服务器出错了还是leader出错了,当前服务器都会认为是leader出错而发起选举。这个时候就会出现两种情况
集群中本来就已经存在一个Leader
机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
集群中确实不存在Leader
选举Leader规则: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出
大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出
监听器原理分为zk客户端和服务端
1、zk客户端main()线程
2、创建zkclient
listenerconnect
3、zkclient用getChildren("/",true)获取监听
4、当zk集群中"/"发生节点变化时反应给zkclient->listener
5、zkclient调用watcher中process()做出反应
利用Zookeeper实现分布式锁在zookeeper集群中创建locks节点
当上锁时:
{
在locks创建带序号的非永久节点
判断创建节点的序号{
如果序号小于0,数据错误
序号大于0,利用zookeeper watcher对节点删除进行监听,如果删除了节点且为当前序号前一位则继续
序号等于0,立刻获取锁
}
}
释放锁:{
删除对应创建的非永久节点
}
public class DistributedLock {
private final ZooKeeper zooKeeper;
private final CountDownLatch connect = new CountDownLatch(1);
private final CountDownLatch wake = new CountDownLatch(1);
private String preNode;
private String currentNode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
//创建zookeeper客户端
String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
zooKeeper = new ZooKeeper(connectString, 2 * 1000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connect.countDown();
}
if (event.getType()==Event.EventType.NodeDeleted && Objects.equals(event.getPath(), preNode)) {
wake.countDown();
}
}
});
//如果没有连接成功zookeeper等待
connect.await();
//查看zookeeper中是否存在locks节点,如果不存在创建
Stat stat = zooKeeper.exists("/locks", false);
if (stat == null) {
zooKeeper.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void lock() throws KeeperException, InterruptedException {
//在locks建立节点scp-0000....
currentNode = zooKeeper.create("/locks/scp-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("zk-lock建立锁 "+currentNode);
//获取建立的节点在locks节点下的位置
List children = zooKeeper.getChildren("/locks", false);
int index = children.indexOf(currentNode.substring("/locks/".length()));
// 如果在第0位,则立刻获取这个锁
if (index == 0) {
return;
} else if (index < 0) {
// 如果小于0,则出现错误
System.out.println("----------数据ERROR----------");
} else {
// 如果大于0,则将所在位置的前一位锁节点记录,阻塞进程,直到监视进程发现前节点消失释放进程
preNode = "/locks/"+children.get(index - 1);
zooKeeper.getData(preNode,true,new Stat());
wake.await();
}
}
public void releaseLock() throws KeeperException, InterruptedException {
zooKeeper.delete(currentNode,-1);
}
}
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.lock();
System.out.println("线程 1 获取锁");
Thread.sleep(5*1000);
System.out.println("线程 1 释放锁");
lock1.releaseLock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.lock();
System.out.println("线程 2 获取锁");
Thread.sleep(5*1000);
System.out.println("线程 2 释放锁");
lock2.releaseLock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}



