栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper选举机制和Zookeeper实现分布式锁

Zookeeper选举机制和Zookeeper实现分布式锁

Zookeeper zookeeper文件目录

bin

conf

zoo.cfg配置文件

myzkData

myid身份标志

…等等

选举机制

出现以下情况会进入leader选举

服务器初始化启动服务器运行期间无法和Leader保持连接

每个zookeeper服务器都具有的东西

1)服务器初始化启动

选票都给myid最大的那个,选出了leader后,后面进来的zookeeper只能成为follower

2)服务器运行期间无法和Leader保持连接

当服务器运行期间无法与Leader保持连接后,无论是当前服务器出错了还是leader出错了,当前服务器都会认为是leader出错而发起选举。这个时候就会出现两种情况

集群中本来就已经存在一个Leader

机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。

集群中确实不存在Leader

选举Leader规则: ①EPOCH大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出

大的直接胜出 ②EPOCH相同,事务id大的胜出 ③事务id相同,服务器id大的胜出

监听器原理

分为zk客户端和服务端

1、zk客户端main()线程

2、创建zkclient

listenerconnect

3、zkclient用getChildren("/",true)获取监听

4、当zk集群中"/"发生节点变化时反应给zkclient->listener

5、zkclient调用watcher中process()做出反应

利用Zookeeper实现分布式锁

在zookeeper集群中创建locks节点
当上锁时:
{
	在locks创建带序号的非永久节点

	判断创建节点的序号{
        如果序号小于0,数据错误
        序号大于0,利用zookeeper watcher对节点删除进行监听,如果删除了节点且为当前序号前一位则继续
        序号等于0,立刻获取锁
    }			
}
释放锁:{
    删除对应创建的非永久节点
}
public class DistributedLock {

    private final ZooKeeper zooKeeper;
    private final CountDownLatch connect = new CountDownLatch(1);
    private final CountDownLatch wake = new CountDownLatch(1);
    private String preNode;
    private String currentNode;


    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        //创建zookeeper客户端
        String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
        zooKeeper = new ZooKeeper(connectString, 2 * 1000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connect.countDown();
                }

                if (event.getType()==Event.EventType.NodeDeleted && Objects.equals(event.getPath(), preNode)) {
                    wake.countDown();
                }
            }
        });
        //如果没有连接成功zookeeper等待
        connect.await();
        //查看zookeeper中是否存在locks节点,如果不存在创建
        Stat stat = zooKeeper.exists("/locks", false);
        if (stat == null) {
            zooKeeper.create("/locks", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    
    public void lock() throws KeeperException, InterruptedException {
        //在locks建立节点scp-0000....
        currentNode = zooKeeper.create("/locks/scp-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("zk-lock建立锁 "+currentNode);
        //获取建立的节点在locks节点下的位置
        List children = zooKeeper.getChildren("/locks", false);
        int index = children.indexOf(currentNode.substring("/locks/".length()));
        // 如果在第0位,则立刻获取这个锁
        if (index == 0) {
            return;
        } else if (index < 0) {
            // 如果小于0,则出现错误
            System.out.println("----------数据ERROR----------");
        } else {
            // 如果大于0,则将所在位置的前一位锁节点记录,阻塞进程,直到监视进程发现前节点消失释放进程
            preNode = "/locks/"+children.get(index - 1);
            zooKeeper.getData(preNode,true,new Stat());
            wake.await();
        }
    }

    
    public void releaseLock() throws KeeperException, InterruptedException {
        zooKeeper.delete(currentNode,-1);
    }

}
public class DistributedLockTest {
    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.lock();
                    System.out.println("线程 1 获取锁");
                    Thread.sleep(5*1000);
                    System.out.println("线程 1 释放锁");
                    lock1.releaseLock();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.lock();
                    System.out.println("线程 2 获取锁");
                    Thread.sleep(5*1000);
                    System.out.println("线程 2 释放锁");
                    lock2.releaseLock();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728769.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号