栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Zookeeper学习总结

Zookeeper学习总结

初识zookeeper

zookeeper的基本概念:

zookeeper的安装与配置

参见zookeeper的安装与配置(转黑马官方)_chaofengdev的博客-CSDN博客

zookeeper命令操作

数据模型

客户端常用命令

服务端常用命令

zookeeper java api操作

curator介绍

curator常用api

package com.itheima;
​
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
​
import java.nio.charset.StandardCharsets;
import java.util.List;
​
public class curatorTest {
    //提升作用域
    Curatorframework client;
//==============================================create=========================================================================
    
    @Test
    public void testConnect1(){
        //获取zookeeper client对象的两种方式。
        //1.第一种方式
        
        //重试策略对象
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //获取client对象(Curatorframework)
        client = CuratorframeworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
        //开启连接
        client.start();
    }
​
    @Before
    public void testConnect2(){
        //2.第二种方式
        //链式编程
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
        client.start();
    }
​
    //创建结点:持久、临时、顺序、数据
    //1.基本创建
    //2.创建结点,带有数据
    //3.设置结点类型
    //4.创建多级结点
    @Test
    public void testCreate1() throws Exception {
        //1.基本创建
        //创建结点时没有指定结点的数据,会默认存储客户端的ip地址
        String path = client.create().forPath("/app22");
        System.out.println(path);
    }
​
    @Test
    public void testCreate2() throws Exception {
        //2.创建结点,带有数据
        String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @Test
    public void testCreate3() throws Exception {
        //3.设置结点类型
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @Test
    public void testCreate4() throws Exception {
        //4.创建多级结点
        String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @After
    public void close(){
        if(client!=null){
            client.close();
        }
    }
​
    //==================================================get================================================================
​
    
    @Test
    public void testGet1() throws Exception {
        //1.查询结点数据 get
        byte[] data = client.getData().forPath("/app4/p1");
        System.out.println(new String(data));
    }
​
    @Test
    public void testGet2() throws Exception{
        //2.查询结点信息 ls
        List child = client.getChildren().forPath("/");
        System.out.println(child);
    }
​
    @Test
    public void testGet3() throws Exception{
        //3.查询结点状态信息 ls -s
        Stat stat = new Stat();
        System.out.println(stat);//查询前为空
        client.getData().storingStatIn(stat).forPath("/app4");
        System.out.println(stat);//查询结点的状态信息
    }
    //===============================================set===================================================================
​
    
    @Test
    public void testSet() throws Exception {
        Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
        System.out.println(stat);
    }
​
    @Test
    public void testSetForVersion() throws Exception {
        //查询结点相关信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app1");
        //查询接地那信息中的版本信息version(本质上是锁的机制)
        int version = stat.getVersion();
        Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
    }
​
    //==========================================delete=====================================================================
​
    
    @Test
    public void testDelete() throws Exception {
        //1.删除单个结点
        client.delete().forPath("/app1");
    }
​
    @Test
    public void testDelete2() throws Exception {
        //2.删除带有子节点的结点
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }
​
    @Test
    public void testDelete3() throws Exception{
        //3.必须成功删除(为了防止网络抖动,本质就是重试几次)
        client.delete().guaranteed().forPath("/app2");
    }
​
    @Test
    public void testDelete4() throws Exception {
        //4.回调(一知半解)
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(Curatorframework client, CuratorEvent event) throws Exception {
                System.out.println("delete");//提示删除完成
                System.out.println(event);//相关信息
            }
        }).forPath("app4");
    }
}
​
package com.itheima;
​
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
​
import java.nio.charset.StandardCharsets;
import java.util.List;
​
public class curatorWatcherTest {
    //提升作用域
    Curatorframework client;
//==============================================create=========================================================================
    
    @Test
    public void testConnect1(){
        //获取zookeeper client对象的两种方式。
        //1.第一种方式
        
        //重试策略对象
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //获取client对象(Curatorframework)
        client = CuratorframeworkFactory.newClient("192.168.200.154:2181", 60 * 1000, 15 * 1000, retryPolicy);
        //开启连接
        client.start();
    }
​
    @Before
    public void testConnect2(){
        //2.第二种方式
        //链式编程
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
        client.start();
    }
​
    //创建结点:持久、临时、顺序、数据
    //1.基本创建
    //2.创建结点,带有数据
    //3.设置结点类型
    //4.创建多级结点
    @Test
    public void testCreate1() throws Exception {
        //1.基本创建
        //创建结点时没有指定结点的数据,会默认存储客户端的ip地址
        String path = client.create().forPath("/app22");
        System.out.println(path);
    }
​
    @Test
    public void testCreate2() throws Exception {
        //2.创建结点,带有数据
        String path = client.create().forPath("/app2", "zifushuzu".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @Test
    public void testCreate3() throws Exception {
        //3.设置结点类型
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "hello".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @Test
    public void testCreate4() throws Exception {
        //4.创建多级结点
        String path = client.create().creatingParentContainersIfNeeded().forPath("/app4/p1","nihaoya".getBytes(StandardCharsets.UTF_8));
        System.out.println(path);
    }
​
    @After
    public void close(){
        if(client!=null){
            client.close();
        }
    }
​
    //==================================================get================================================================
​
    
    @Test
    public void testGet1() throws Exception {
        //1.查询结点数据 get
        byte[] data = client.getData().forPath("/app4/p1");
        System.out.println(new String(data));
    }
​
    @Test
    public void testGet2() throws Exception{
        //2.查询结点信息 ls
        List child = client.getChildren().forPath("/");
        System.out.println(child);
    }
​
    @Test
    public void testGet3() throws Exception{
        //3.查询结点状态信息 ls -s
        Stat stat = new Stat();
        System.out.println(stat);//查询前为空
        client.getData().storingStatIn(stat).forPath("/app4");
        System.out.println(stat);//查询结点的状态信息
    }
    //===============================================set===================================================================
​
    
    @Test
    public void testSet() throws Exception {
        Stat stat = client.setData().forPath("/app1", "itcast".getBytes(StandardCharsets.UTF_8));
        System.out.println(stat);
    }
​
    @Test
    public void testSetForVersion() throws Exception {
        //查询结点相关信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath("/app1");
        //查询接地那信息中的版本信息version(本质上是锁的机制)
        int version = stat.getVersion();
        Stat stat1 = client.setData().withVersion(version).forPath("/app1","haha".getBytes());
    }
​
    //==========================================delete=====================================================================
​
    
    @Test
    public void testDelete() throws Exception {
        //1.删除单个结点
        client.delete().forPath("/app1");
    }
​
    @Test
    public void testDelete2() throws Exception {
        //2.删除带有子节点的结点
        client.delete().deletingChildrenIfNeeded().forPath("/app4");
    }
​
    @Test
    public void testDelete3() throws Exception{
        //3.必须成功删除(为了防止网络抖动,本质就是重试几次)
        client.delete().guaranteed().forPath("/app2");
    }
​
    @Test
    public void testDelete4() throws Exception {
        //4.回调(一知半解)
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(Curatorframework client, CuratorEvent event) throws Exception {
                System.out.println("delete");//提示删除完成
                System.out.println(event);//相关信息
            }
        }).forPath("app4");
    }
​
​
    //======================================watch========================================================================
    
    @Test
    public void testPathChildrenCache() throws Exception{
        //1.创建监听对象
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app2", true);
​
        //2.绑定监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("child node has changed...");
                System.out.println(event);
                //监听子节点的数据变更,并且拿到变更后的数据
                //1.获取数据类型
                PathChildrenCacheEvent.Type type = event.getType();
                //2.判断类型是否是update
                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    //PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=192,209,1643545922168,1643546628992,2,0,0,0,3,0,192
                    //, data=[49, 49, 49]}}
                    System.out.println("child node update...");
                    byte[] data = event.getData().getData();//根据event的输出格式来理解此处代码
                    System.out.println(new String(data));
                }
            }
        });
​
        //3.开启监听器
        pathChildrenCache.start();
        while(true){
            //死循环
        }
    }
​
    @Test
    public void testNodeCache() throws Exception{
        //1.创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client, "/app1");
        //2.注册监听-理解
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("node has changed...");
                //获取修改结点后的数据
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //3.开启监听,如果设置为true,开启监听时加载缓存数据。
        nodeCache.start(true);
        //死循环用于制造客户端一直连接并监听相关结点的情况
        //这里一直循环,表示客户端一直存活,服务端有响应会返回;
        //否则单元测试结束,表示客户端结束,服务端有结果也无法返回。
        while(true){
            //死循环
        }
    }
​
    //=========================================treeCache==================================================================
    @Test
    public void testTreeCache() throws Exception {
        //1.创建监听器
        TreeCache treeCache = new TreeCache(client, "/app2");
        //2.注册监听
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(Curatorframework client, TreeCacheEvent event) throws Exception {
                System.out.println("treeCache changed...");
                System.out.println(event);
            }
        });
        //3.开启监听
        treeCache.start();
        while (true) {
        }
    }
​
}
​

分布式锁

模拟12306

package com.itheima;
​
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
​
import java.util.concurrent.TimeUnit;
​

public class Ticket12306 implements Runnable{
    private int ticket = 100000000;//票数
    private InterProcessMutex interProcessMutex;//互斥锁
    Curatorframework client;//客户端对象
    public Ticket12306() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorframeworkFactory.builder().connectString("192.168.200.154:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy).namespace("itheima").build();//使用名称空间
        client.start();
        interProcessMutex = new InterProcessMutex(client,"/lock");
    }
​
    @Override
    public void run() {
        while(true){
            //加锁
            try {
                interProcessMutex.acquire(3, TimeUnit.SECONDS);
                if (ticket > 0) {
                    System.out.println(Thread.currentThread().getName()+":"+ticket);
                    ticket--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                //释放锁
                try {
                    interProcessMutex.release();//突然理解异常的用处了。
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
​
        }
    }
}
package com.itheima;
​

public class lockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306 = new Ticket12306();
        //创建客户端
        Thread xiecheng = new Thread(ticket12306, "xiecheng");
        Thread feizhu = new Thread(ticket12306, "feizhu");
        //开启两个线程来运行买票服务
        xiecheng.start();
        feizhu.start();
    }
}
​
zookeeper集群搭建

参见:搭建Zookeeper集群(转黑马官方,补充了一点可能遇到的小坑)_chaofengdev的博客-CSDN博客

zookeeper核心理论

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/724463.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号