zookeeper的基本概念:
zookeeper的安装与配置参见zookeeper的安装与配置(转黑马官方)_chaofengdev的博客-CSDN博客
zookeeper命令操作数据模型
客户端常用命令
服务端常用命令
zookeeper java api操作curator介绍
curator常用api
package com.itheima;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class curatorTest {
//提升作用域
Curatorframework client;
//==============================================create=========================================================================
@Test
public void testConnect1(){
//获取zookeeper client对象的两种方式。
//1.第一种方式
//重试策略对象
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//获取client对象(Curatorframework)
client = CuratorframeworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
//开启连接
client.start();
}
@Before
public void testConnect2(){
//2.第二种方式
//链式编程
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
client.start();
}
//创建结点:持久、临时、顺序、数据
//1.基本创建
//2.创建结点,带有数据
//3.设置结点类型
//4.创建多级结点
@Test
public void testCreate1() throws Exception {
//1.基本创建
//创建结点时没有指定结点的数据,会默认存储客户端的ip地址
String path = client.create().forPath("/app22");
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
//2.创建结点,带有数据
String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
//3.设置结点类型
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
//4.创建多级结点
String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@After
public void close(){
if(client!=null){
client.close();
}
}
//==================================================get================================================================
@Test
public void testGet1() throws Exception {
//1.查询结点数据 get
byte[] data = client.getData().forPath("/app4/p1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception{
//2.查询结点信息 ls
List child = client.getChildren().forPath("/");
System.out.println(child);
}
@Test
public void testGet3() throws Exception{
//3.查询结点状态信息 ls -s
Stat stat = new Stat();
System.out.println(stat);//查询前为空
client.getData().storingStatIn(stat).forPath("/app4");
System.out.println(stat);//查询结点的状态信息
}
//===============================================set===================================================================
@Test
public void testSet() throws Exception {
Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
System.out.println(stat);
}
@Test
public void testSetForVersion() throws Exception {
//查询结点相关信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
//查询接地那信息中的版本信息version(本质上是锁的机制)
int version = stat.getVersion();
Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
}
//==========================================delete=====================================================================
@Test
public void testDelete() throws Exception {
//1.删除单个结点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//2.删除带有子节点的结点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception{
//3.必须成功删除(为了防止网络抖动,本质就是重试几次)
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4.回调(一知半解)
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(Curatorframework client, CuratorEvent event) throws Exception {
System.out.println("delete");//提示删除完成
System.out.println(event);//相关信息
}
}).forPath("app4");
}
}
package com.itheima;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class curatorWatcherTest {
//提升作用域
Curatorframework client;
//==============================================create=========================================================================
@Test
public void testConnect1(){
//获取zookeeper client对象的两种方式。
//1.第一种方式
//重试策略对象
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
//获取client对象(Curatorframework)
client = CuratorframeworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
//开启连接
client.start();
}
@Before
public void testConnect2(){
//2.第二种方式
//链式编程
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
client.start();
}
//创建结点:持久、临时、顺序、数据
//1.基本创建
//2.创建结点,带有数据
//3.设置结点类型
//4.创建多级结点
@Test
public void testCreate1() throws Exception {
//1.基本创建
//创建结点时没有指定结点的数据,会默认存储客户端的ip地址
String path = client.create().forPath("/app22");
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
//2.创建结点,带有数据
String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
//3.设置结点类型
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@Test
public void testCreate4() throws Exception {
//4.创建多级结点
String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
System.out.println(path);
}
@After
public void close(){
if(client!=null){
client.close();
}
}
//==================================================get================================================================
@Test
public void testGet1() throws Exception {
//1.查询结点数据 get
byte[] data = client.getData().forPath("/app4/p1");
System.out.println(new String(data));
}
@Test
public void testGet2() throws Exception{
//2.查询结点信息 ls
List child = client.getChildren().forPath("/");
System.out.println(child);
}
@Test
public void testGet3() throws Exception{
//3.查询结点状态信息 ls -s
Stat stat = new Stat();
System.out.println(stat);//查询前为空
client.getData().storingStatIn(stat).forPath("/app4");
System.out.println(stat);//查询结点的状态信息
}
//===============================================set===================================================================
@Test
public void testSet() throws Exception {
Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
System.out.println(stat);
}
@Test
public void testSetForVersion() throws Exception {
//查询结点相关信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app1");
//查询接地那信息中的版本信息version(本质上是锁的机制)
int version = stat.getVersion();
Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
}
//==========================================delete=====================================================================
@Test
public void testDelete() throws Exception {
//1.删除单个结点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//2.删除带有子节点的结点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}
@Test
public void testDelete3() throws Exception{
//3.必须成功删除(为了防止网络抖动,本质就是重试几次)
client.delete().guaranteed().forPath("/app2");
}
@Test
public void testDelete4() throws Exception {
//4.回调(一知半解)
client.delete().guaranteed().inBackground(new BackgroundCallback() {
@Override
public void processResult(Curatorframework client, CuratorEvent event) throws Exception {
System.out.println("delete");//提示删除完成
System.out.println(event);//相关信息
}
}).forPath("app4");
}
//======================================watch========================================================================
@Test
public void testPathChildrenCache() throws Exception{
//1.创建监听对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
//2.绑定监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("child node has changed...");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取数据类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=192,209,1643545922168,1643546628992,2,0,0,0,3,0,192
//, data=[49, 49, 49]}}
System.out.println("child node update...");
byte[] data = event.getData().getData();//根据event的输出格式来理解此处代码
System.out.println(new String(data));
}
}
});
//3.开启监听器
pathChildrenCache.start();
while(true){
//死循环
}
}
@Test
public void testNodeCache() throws Exception{
//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(client, "/app1");
//2.注册监听-理解
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("node has changed...");
//获取修改结点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听,如果设置为true,开启监听时加载缓存数据。
nodeCache.start(true);
//死循环用于制造客户端一直连接并监听相关结点的情况
//这里一直循环,表示客户端一直存活,服务端有响应会返回;
//否则单元测试结束,表示客户端结束,服务端有结果也无法返回。
while(true){
//死循环
}
}
//=========================================treeCache==================================================================
@Test
public void testTreeCache() throws Exception {
//1.创建监听器
TreeCache treeCache = new TreeCache(client, "/app2");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception {
System.out.println("treeCache changed...");
System.out.println(event);
}
});
//3.开启监听
treeCache.start();
while (true) {
}
}
}
分布式锁
模拟12306
package com.itheima;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int ticket = 100000000;//票数
private InterProcessMutex interProcessMutex;//互斥锁
Curatorframework client;//客户端对象
public Ticket12306() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
client.start();
interProcessMutex = new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while(true){
//加锁
try {
interProcessMutex.acquire(3, TimeUnit.SECONDS);
if (ticket > 0) {
System.out.println(Thread.currentThread().getName()+":"+ticket);
ticket--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
interProcessMutex.release();//突然理解异常的用处了。
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.itheima;
public class lockTest {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
//创建客户端
Thread xiecheng = new Thread(ticket12306, "xiecheng");
Thread feizhu = new Thread(ticket12306, "feizhu");
//开启两个线程来运行买票服务
xiecheng.start();
feizhu.start();
}
}
zookeeper集群搭建
参见:搭建Zookeeper集群(转黑马官方,补充了一点可能遇到的小坑)_chaofengdev的博客-CSDN博客
zookeeper核心理论


