栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

Zookeeper学习笔记(二)应用之分布式锁

Zookeeper学习笔记(二)应用之分布式锁

1. Zookeeper实现分布式锁的特性 特性一:Zookeeper子节点(顺序节点,EPHEMERAL_SEQUENTIAL)可以有序且递增

在父节点(/lock)下创建临时顺序节点,节点的次序编号会按照节点的创建时间依次递增,示意图:

Zookeeper分布式锁可以基于这一点特性实现公平锁,以节点的次序编号来决定哪个节点可以获得锁,即可以用次序编号最小的节点来表示获得锁,每个线程在尝试占用锁之前,首先判断自己节点编号是否是当前最小,如果是,则获取锁。

特性二:ZooKeeper的节点监听机制

当前 /lock 节点下的每一个已经创建好的子节点都设置为监听上一个节点,比如 2 号节点监听 1 号节点,3 号节点监听 2 号节点…依次循坏。当 1 号节点释放锁之后,2 号节点会监听到此事件并获取锁,同样,当 2 号节点释放锁之后,3 号节点也能收到通知。


羊群效应(惊群效应):

所有的请求(获取锁的请求)都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争。

可以基于ZooKeeper的节点监听机制有效避免羊群效应。

2. 实现原理
  1. zookeeper客户端创建持久根节点/lock
  2. 创建临时顺序节点,比如/lock/xxx…001
  3. 查询/lock节点下面所有子节点,然后判断自己的节点是不是排序最小的那个,此时,如果是最小的则会获得锁。如果自己不是最小的,则从所有子节点里面获取比自己次小的一个节点,然后设置监听该节点的事件,然后挂起当前线程。
  4. 当获取锁的线程执行

3. Zookeeper实现分布式锁的简易代码
package com.jbp.zookeeper;


import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;


@Slf4j
public class ZkLock implements Lock, Watcher {

    
    private static final String ZK_LOCK_PATH = "/lock";

    
    private static final String LOCK_PREFIX = ZK_LOCK_PATH + "/";

    
    private static final int LOCK_WAIT_TIME = 1000 * 30;

    
    private ZooKeeper zooKeeper = null;

    private String lockName;

    
    private String WAIT_LOCK;

    
    private String CURRENT_LOCK;

    
    private CountDownLatch countDownLatch;

    private List exceptionList = new ArrayList<>();

    
    public ZkLock(String zookeeperUrl,String lockName) {
        this.lockName = lockName;
        try {
            // 连接到 zookeeper
            ZooKeeper zooKeeper = new ZooKeeper(zookeeperUrl, LOCK_WAIT_TIME, this);
            // 判断 ZK分布式锁父节点是否已经存在
            Stat flag = zooKeeper.exists(ZK_LOCK_PATH, false);
            if (flag == null){
                // 父节点不存在,则创建
                zooKeeper.create(ZK_LOCK_PATH,new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            }
        } catch (IOException | InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    @SneakyThrows
    @Override
    public void lock() {
        if (exceptionList.size() > 0){
            throw new RuntimeException(exceptionList.get(0));
        }
        if (this.tryLock()){
            log.info("当前线程:{},和临界资源:{},获得了锁",Thread.currentThread().getName(),lockName);
            return;
        }else {
            // 等待锁
            waitForLock(WAIT_LOCK,LOCK_WAIT_TIME);
        }

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }

    
    @Override
    public boolean tryLock() {
        String splitStr = "_lock_";
        if (lockName.contains(splitStr)){
            throw new RuntimeException("锁名有误");
        }
        // 创建临时有序节点
        try {
            CURRENT_LOCK = zooKeeper.create(LOCK_PREFIX + lockName + splitStr,new byte[0],ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            log.info("当前锁:{}已经创建",CURRENT_LOCK);
            // 取出父节点下所有子节点
            List childrenList = zooKeeper.getChildren(ZK_LOCK_PATH, false);
            // 取出所有lockName的锁
            ArrayList lockNames = new ArrayList<>();
            for (String children : childrenList) {
                String childrenLockName = children.split(splitStr)[0];
                if (childrenLockName.equals(lockName)){
                    lockNames.add(children);
                }
            }
            // 排序
            Collections.sort(lockNames);
            log.info("当前线程:{}获取的锁是:{}",Thread.currentThread().getName(),CURRENT_LOCK);
            // 若当前节点为最小节点,则获取锁成功
            if (CURRENT_LOCK.equals(LOCK_PREFIX + lockNames.get(0))){
                return true;
            }
            // 若不是最小节点,则找到自己的前一个节点
            String preChildren = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            WAIT_LOCK = lockNames.get(Collections.binarySearch(lockNames,preChildren) - 1);
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }

        return false;
    }

    
    @SneakyThrows
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (this.tryLock()){
            return true;
        }
        return waitForLock(WAIT_LOCK,LOCK_WAIT_TIME);
    }

    @Override
    public void unlock() {
        log.info("释放锁:{}",CURRENT_LOCK);
        try {
            zooKeeper.delete(CURRENT_LOCK,-1);
            CURRENT_LOCK = null;
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }finally {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    @Override
    public Condition newCondition() {
        return null;
    }

    
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (this.countDownLatch != null){
            this.countDownLatch.countDown();
        }
    }

    
    private boolean waitForLock(String preChildren,long waitTime) throws InterruptedException, KeeperException {
        // 判断 ZK分布式锁父节点下的 preChildren 子节点是否已经存在
        Stat flag = zooKeeper.exists(LOCK_PREFIX + preChildren, false);
        if (flag != null){
            log.info("当前线程:{}的节点:{}等待锁",Thread.currentThread().getName(),LOCK_PREFIX + preChildren);
            this.countDownLatch = new CountDownLatch(1);
            // 计数等待,若等到前一个节点消失,则 precess中进行countDown,停止等待,获取锁
            this.countDownLatch.await(waitTime,TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            log.info("当前线程:{}获取到了锁",Thread.currentThread().getName());
        }

        return true;
    }

}

4. 基于Curator的可重入锁
InterProcessMutex zkLock = new InterProcessMutex(Curatorframework, "/lock");

// 获取锁
zkLock.acquire();

// 释放锁
zkLock.release();
5. 优缺点

优点:

  • ZooKeeper分布式锁能有效的解决分布式问题,不可重入问题,使用起来也较为简单。

缺点:

  • 单独维护一套Zookeeper集群,频繁的Watch对Zookeeper集群的压力比较大。
  • 性能不理想,每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能(ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样会造成频繁的网络通信)。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278970.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号