栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper分布式锁

zookeeper分布式锁

当多个客户端对zk集群中的资源进行访问时,为了保持资源访问的有序性和稳健性,在每个客户端进行访问时需要在访问期间保持其对该份资源的独占性,用分布式锁来实现此步骤,当某进程访问资源结束后将会释放掉锁以供下一个节点对数据的访问。

分布式锁的实现思路:

  • 接收到客户端的请求后在/locks节点下创建一个临时带序号的节点。
  • 判断当前创建的节点是否为序号最小的节点,是则获取到锁,否则监听其上一个节点。因为默认是当前/locks节点中序号最小的节点优先获取到锁。
  • 获取到锁并处理完业务后该节点释放掉锁(该序号最小的节点被删除)然后后面的锁升为序号最小的节点,递归前面的步骤。

目前有成熟的分布式锁框架,但是为了巩固基础手写一个,涉及到多线程,编写步骤较为复杂。

package com.tommy.case2;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {
    private final String connectString = "192.168.20.151:2181,192.168.20.152:2181,192.168.20.153:2181";
    private final int sessionTimeout = 2000000;
    private final ZooKeeper zk;
    private int count = 0;
    private String waitPath;
    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);
    private String currentMode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        // 获取连接
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                // connectLatch 如果连接上zk,释放掉
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // waitLatch 需要释放
                if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待zk正常连接后,才往下执行,增强代码的健壮性。
        connectLatch.await();

        // 判断根节点是否存在
        try {
            byte[] data = zk.getData("/locks", false, null);
        } catch (KeeperException.NoNodeException e) {
            System.out.println("directory /locks is not exists, creating....");
            zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("directory /locks creating success!!!!");
        }
    }

    public void zkLock() throws InterruptedException, KeeperException {
        // 创建节点(临时带序号的节点)
        currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        // 判断是否为序号最小的节点,如果是则获取到锁,否则对上一个节点进行监听
        List children = zk.getChildren("/locks", false);
        count = children.size();
        if (count == 1) {
            return;
        } else {
            Collections.sort(children);
            String thisNode = currentMode.substring("/locks/".length());
            // 获取当前节点到在集合children中的位置
            int index = children.indexOf(thisNode);
            if (index == -1) {
                System.out.println("error Data");
            } else if (index == 0) {
                // only one node in children
                return;
            } else {
                // listen last node's change
                waitPath = "/locks/" + children.get(index - 1);
                zk.getData(waitPath, true, null);
                waitLatch.await();
                return;
            }
        }
    }

    public void unZkLock() {
        // 删除节点以释放掉锁
        try {
            zk.delete(currentMode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}
package com.tommy.case2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        final DistributeLock lock1 = new DistributeLock();
        final DistributeLock lock2 = new DistributeLock();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.zkLock();
                    System.out.println("线程1启动,获取到锁");
                    Thread.sleep(5 * 1000);
                    lock1.unZkLock();
                    System.out.println("线程1释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.zkLock();
                    System.out.println("线程2启动,获取到锁");
                    Thread.sleep(5 * 1000);
                    lock2.unZkLock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (KeeperException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422492.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号