栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

接上篇:一套方案轻松实现Zookeeper服务器动态上下线和分布式锁

接上篇:一套方案轻松实现Zookeeper服务器动态上下线和分布式锁

}

} catch (IOException e) {

e.printStackTrace();

}

}

}

client端:

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.ZkClient;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.Socket;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

// 注册监听zk指定目录,

//维护自己本地一个servers信息,收到通知要进行更新

//发送时间查询请求并接受服务端返回的数据

public class Client {

//获取zkclient

ZkClient zkClient = null;

//维护一个serversi 信息集合

ArrayList infos = new ArrayList();

private void connectZk() {

// 创建zkclient

zkClient = new ZkClient(“linux121:2181,linux122:2181”);

/

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

/第一次获取服务器信息,所有的子节点

final List childs = zkClient.getChildren("/servers");

for (String child : childs) {

//存储着ip+port

final Object o = zkClient.readData("/servers/" + child);

infos.add(String.valueOf(o));

}

//对servers目录进行监听

zkClient.subscribeChildChanges("/servers", new IZkChildListener() {

public void handleChildChange(String s, List children) throws Exception {

//接收到通知,说明节点发生了变化,client需要更新infos集合中的数据

ArrayList list = new ArrayList();

//遍历更新过后的所有节点信息

for (String path : children) {

final Object o = zkClient.readData("/servers/" + path);

list.add(String.valueOf(o));

}

//最新数据覆盖老数据

infos = list;

System.out.println("–》接收到通知,最新服务器信息为:" + infos);

}

});

}

//发送时间查询的请求

public void sendRequest() throws IOException {

//目标服务器地址

final Random random = new Random();

final int i = random.nextInt(infos.size()); //随机选择一个服务器

final String ipPort = infos.get(i);

final String[] arr = ipPort.split(";

//建立socket连接

final Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));

final OutputStream out = socket.getOutputStream();

final InputStream in = socket.getInputStream();

//发送数据

out.write(“query time”.getBytes());

out.flush();

//接收返回结果

final byte[] b = new byte[1024];

in.read(b);//读取服务端返回数据

System.out.println(“client端接收到server:+” + ipPort + “+返回结果:” + new String(b));

//释放资源

in.close();

out.close();

socket.close();

}

public static void main(String[] args) throws InterruptedException {

final Client client = new Client();

client.connectZk(); //监听器逻辑

while (true) {

try {

client.sendRequest(); //发送请求

} catch (IOException e) {

e.printStackTrace();

try {

client.sendRequest();

} catch (IOException e1) {

e1.printStackTrace();

}

}

//每隔几秒中发送一次请求到服务端

Thread.sleep(2000);

}

}

}

分布式锁:

=====

1.什么是锁:

=======

  • 在单机程序中,当存在多个线程可以同时改变某个变量(可变共享变量)时,为了保证线程安全 (数据不能出现脏数据)就需要对变量或代码块做同步,使其在修改这种变量时能够串⾏执⾏消除并 发修改变量。

  • 对变量或者堆代码码块做同步本质上就是加锁。⽬的就是实现多个线程在⼀个时刻同⼀个代码块只 能有⼀个线程可执⾏

2. 分布式锁

========

分布式的环境中会不会出现脏数据的情况呢?类似单机程序中线程安全的问题。观察下⾯的例⼦:

上⾯的设计是存在线程安全问题。

问题:

假设Redis ⾥⾯的某个商品库存为 1;此时两个⽤户同时下单,其中⼀个下单请求执⾏到第 3 步,更新 数据库的库存为 0,但是第 4 步还没有执⾏。

⽽另外⼀个⽤户下单执⾏到了第 2 步,发现库存还是 1,就继续执⾏第 3 步。但是商品库存已经为0, 所以如果数据库没有限制就会出现超卖的问题。

解决方案:

公司业务发展迅速,系统应对并发不断提⾼,解决⽅案是要增加⼀台机器,结果会出现更⼤的问题:

利⽤Zookeeper可以创建临时带序号节点的特性来实现⼀个分布式锁。

分布式锁的作⽤:在整个系统提供⼀个全局、唯⼀的锁,在分布式系统中每个系统在进⾏相关操作的时 候需要获取到该锁,才能执⾏相应操作。

zk实现分布式锁:

=========

利⽤Zookeeper可以创建临时带序号节点的特性来实现⼀个分布式锁。

实现思路:

  • 锁就是zk指定⽬录下序号最⼩的临时序列节点,多个系统的多个线程都要在此⽬录下创建临时的顺 序节点,因为Zk会为我们保证节点的顺序性,所以可以利⽤节点的顺序进⾏锁的判断。

  • 每个线程都是先创建临时顺序节点,然后获取当前⽬录下最⼩的节点(序号),判断最⼩节点是不是 当前节点,如果是那么获取锁成功,如果不是那么获取锁失败。

  • 获取锁失败的线程获取当前节点上⼀个临时顺序节点,并对对此节点进⾏监听,当该节点删除的时 候(上⼀个线程执⾏结束删除或者是掉线zk删除临时节点)这个线程会获取到通知,代表获取到了 锁。

流程图:

分布式锁的具体代码实现:

整体框架:

//zk实现分布式锁

public class DisLockTest {

public static void main(String[] args) {

//使用10个线程模拟分布式环境

for (int i = 0; i < 10; i++) {

new Thread(new DisLockRunnable()).start();//启动线程

}

}

static class DisLockRunnable implements Runnable {

public void run() {

//每个线程具体的任务,每个线程就是抢锁,

final DisClient client = new DisClient();

client.getDisLock();

//模拟获取锁之后的其它动作

try {

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

//释放锁

client.deleteLock();

}

}

}

具体抢锁类:

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import java.util.Collections;

import java.util.List;

import java.util.concurrent.CountDownLatch;

//抢锁

//1. 去zk创建临时序列节点,并获取到序号

//2. 判断自己创建节点序号是否是当前节点最小序号,如果是则获取锁

//执行相关操作,最后要释放锁

//3. 不是最小节点,当前线程需要等待,等待你的前一个序号的节点

//被删除,然后再次判断自己是否是最小节点。。。

public class DisClient {

public DisClient() {

//初始化zk的/distrilocl节点,会出现线程安全问题

synchronized (DisClient.class){

if (!zkClient.exists("/distrilock")) {

zkClient.createPersistent("/distrilock");

}

}

}

//前一个节点

String beforNodePath;

String currentNoePath; //当前节点

//获取到zkClient

private ZkClient zkClient = new ZkClient(“linux121:2181,linux122:2181”);

//把抢锁过程为两部分,一部分是创建节点,比较序号,另一部分是等待锁

//完整获取锁方法

public void getDisLock() {

//获取到当前线程名称

final String threadName = Thread.currentThread().getName();

//首先调用tryGetLock

if (tryGetLock()) {

//说明获取到锁

System.out.println(threadName + “:获取到了锁”);

} else {

// 没有获取到锁,

System.out.println(threadName + “:获取锁失败,进入等待状态”);

waitForLock(); //等待锁

//递归获取锁

getDisLock();

}

}

CountDownLatch countDownLatch = null;

//尝试获取锁

public boolean tryGetLock() {

//创建临时顺序节点,/distrilock/序号

if (null == currentNoePath || “”.equals(currentNoePath)) {

currentNoePath = zkClient.createEphemeralSequential("/distrilock/", “lock”);

}

//获取到/distrilock下所有的子节点

final List childs = zkClient.getChildren("/distrilock");

//对节点信息进行排序

Collections.sort(childs); //默认是升序

final String minNode = childs.get(0); //最小序号节点

//判断自己创建节点是否与最小序号一致

if (currentNoePath.equals("/distrilock/" + minNode)) {

//说明当前线程创建的就是序号最小节点

return true;

} else {

//说明最小节点不是自己创建,要监控自己当前节点序号前一个的节点

final int i = Collections.binarySearch(childs, currentNoePath.substring("/distrilock/".length()));

//前一个(lastNodeChild是不包括父节点)

String lastNodeChild = childs.get(i - 1);

beforNodePath = “/distrilock/” + lastNodeChild; //获取前一个节点,并告知获取锁失败

}

return false;

}

//等待之前节点释放锁,如何判断锁被释放,需要唤醒线程继续尝试tryGetLock

public void waitForLock() {

//准备一个监听器

final IZkDataListener iZkDataListener = new IZkDataListener() {

public void handleDataChange(String s, Object o) throws Exception {

}

//删除

public void handleDataDeleted(String s) throws Exception {

//提醒当前线程再次获取锁

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674181.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号