spring boot中引入如下依赖
创建多级节点org.apache.zookeeper zookeeper 3.6.2
@Test
void createDeepNode() {
ZooKeeper client = null;
try {
client = new ZooKeeper(ipAddress, 50000, null);
client.create("/updateVideo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
client.create("/updateVideo/dance", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
client.create("/updateVideo/dance/20201101", "dance dance dance".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// client.setData("/更新视频", "这是Data,可以写一些关于业务的参数".getBytes(), -1);
// client.delete("/更新视频", -1);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
结果
[zk: localhost:2181(CONNECTED) 0] ls -R /updateVideo /updateVideo /updateVideo/dance /updateVideo/dance/20201101查看节点是否存在示例
@Test
void isNodeExist() {
ZooKeeper client = null;
try {
client = new ZooKeeper(ipAddress, 50000, null);
Stat stat = client.exists("/updateVideo", false);
logger.info("当前节点是否存在 {}", stat != null ? "存在" : "不存在");
} catch (Exception e) {
} finally {
try {
client.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
获取结果
@Test
void getData() throws Exception {
ZooKeeper client = null;
client = new ZooKeeper(ipAddress, 50000, null);
byte[] data = client.getData("/updateVideo/dance/20201101", false, null);
logger.info("result {}", new String(data));
client.close();
}
获取子节点
@Test
void getChildren() throws Exception {
ZooKeeper client = null;
client = new ZooKeeper(ipAddress, 50000, null);
List children = client.getChildren("/updateVideo", false);
logger.info("result {}", children);
client.close();
}
设置订阅
第一种
设置defaultWatcher,无论何种动作都用以此类作为订阅者
@Test
void register1() throws Exception {
// 方式1
ZooKeeper client = new ZooKeeper(ipAddress, 50000, new Watcher() {
// 这个就是 defaultWatcher 参数,是当前客户端默认的回调实现
@Override
public void process(WatchedEvent event) {
System.out.println("这是本客户端全局的默认回调对象");
}
});
// exists
client.exists("/updateVideo", true);
// getData
client.getData("/updateVideo/dance", true, null);
// getChildren
client.getChildren("/updateVideo", true);
client.close();
}
第二种(推荐)
在对应操作里面传入对应订阅者对象,这样做可以保证这个类只作为这个路径的回调对象
@Test
void register2() throws Exception {
ZooKeeper client = null;
client = new ZooKeeper(ipAddress, 50000, null);
// 方式2
// exists
client.exists("/updateVideo", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是回调对象的实现");
}
});
// getData
client.getData("/updateVideo/dance", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是回调对象的实现");
}
}, null);
// getChildren
client.getChildren("/updateVideo", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("我是回调对象的实现");
}
});
client.close();
}
分布式锁
可参考这个开源项目
Curator 框架整合 Spring Boot 操作 ZooKeeper 并使用分布式工具,示范项目
package com.github.hellogithub;
import org.apache.curator.framework.Curatorframework;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.zookeeper.config.CuratorframeworkFactoryBean;
import org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry;
@Configuration
public class ZookeeperLockConfiguration {
@Value("${zookeeper.host:xxxx:2181}")
private String zkUrl;
@Bean
public CuratorframeworkFactoryBean curatorframeworkFactoryBean() {
return new CuratorframeworkFactoryBean(zkUrl);
}
@Bean
public ZookeeperLockRegistry zookeeperLockRegistry(Curatorframework curatorframework) {
return new ZookeeperLockRegistry(curatorframework, "/HG-lock");
}
}
创建两个测试映射
如下文所示,我们先调用http://127.0.0.1:9999/lock10,这个接口会得到lock对象,然后休眠10s。
这时,我们又立刻调用http://127.0.0.1:9999/immediate,这个接口会在/lock10释放锁候得到锁。时间间隔大概10s左右,这就是zk在高并发场景实现分布式锁的示例。
package com.github.hellogithub;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@RestController
public class TestController {
@Resource
private LockRegistry lockRegistry;
@GetMapping("debug")
public String dubug() {
System.out.println(lockRegistry);
return "SSSS";
}
@GetMapping("/lock10")
public String lock10() {
System.out.println("lock10 start " + System.currentTimeMillis());
final Lock lock = lockRegistry.obtain("lock");
try {
lock.lock();
System.out.println("lock10 get lock success " + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
} finally {
lock.unlock();
}
return "OK";
}
@GetMapping("/immediate")
public String immediate() {
System.out.println("immediate start " + System.currentTimeMillis());
final Lock lock = lockRegistry.obtain("lock");
try {
lock.lock();
System.out.println("immediate get lock success " + System.currentTimeMillis());
} finally {
lock.unlock();
}
return "immediate return";
}
}
控制台输出结果
lock10 start 1633186840089 lock10 get lock success 1633186840146 immediate start 1633186841520 immediate get lock success 1633186850213操作锁定时zk下节点的变化
可以看出,两个映射在取锁时,会先后在zk上创建临时顺序节点,然后分先后顺序完成操作候释放锁,并删除临时节点
[zk: localhost:2181(CONNECTED) 9] ls -R /HG-lock /HG-lock /HG-lock/lock /HG-lock/lock/_c_4861b729-4e90-4c94-bdf1-10a55aec213f-lock-0000000001 /HG-lock/lock/_c_f89f6356-4905-43b2-9a4a-5d4f2af9e7bc-lock-0000000000 [zk: localhost:2181(CONNECTED) 10] ls -R /HG-lock /HG-lock /HG-lock/lock



