栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java从0到架构师】Zookeeper 应用 - Java 客户端操作、服务器动态感知、分布式锁业务处理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java从0到架构师】Zookeeper 应用 - Java 客户端操作、服务器动态感知、分布式锁业务处理

分布式基石 Zookeeper 框架全面剖析
  • Java 客户端操作
    • Java 客户端 API
  • 服务器的动态感知
    • 服务注册
    • 服务发现
  • 分布式锁业务处理
    • 单机环境(一个虚拟机中)
    • 分布式环境_同名节点
    • 分布式环境_顺序节点

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

Zookeeper 连接状态:

Java 客户端操作

Java 客户端操作:

  • 自带的 zkclient

    org.apache.zookeeper
    zookeeper
    3.6.0

  • Apache 开源的 Curator

    org.apache.curator
    curator-framework
    4.3.0


    org.apache.curator
    curator-client
    4.3.0

  • Apache 开源的 ZkClient (com.101tec)

    com.101tec
    zkclient
    0.11

Java 客户端 API

我们使用 Zookeeper 自带的 zkclient 来操作:

创建会话

@SpringBootApplication
public class ZookeeperApiDemoApplication {

    private  static  String ZK_SERVER_ADDR="192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
    private static  int SESSION_TIMEOUT=30000;

    public static void main(String[] args) {
        SpringApplication.run(ZookeeperApiDemoApplication.class, args);
    }

    //创建一个zookeeper的连接
    @Bean
    public ZooKeeper zooKeeper() throws Exception{
        // 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
        ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("event = " + event);
                if(event.getState()== Event.KeeperState.SyncConnected){
                    System.out.println("zookeeper客户端连接成功");
                }
            }
        });
        return zooKeeper;
    }

}

创建节点

@RequestMapping("createNode")
public String createNode(String path, String data, String type) throws Exception{
    String result = zooKeeper.create(path, 
    								 data.getBytes(),
    								 ZooDefs.Ids.OPEN_ACL_UNSAFE, 
    								 CreateMode.valueOf(type));
    return result;
}

获取节点中的数据

// 同步获取数据
@RequestMapping("getData")
public String getData(String path) throws Exception {
    // 1 先去查询版本信息  如果没有, 返回的是一个null
    Stat stat = zooKeeper.exists(path, false);
    // 同步获取数据
    byte[] data = zooKeeper.getData(path, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("event = " + event);
        }
    }, stat);
    System.out.println("new String(data) = " + new String(data));
    return new String(data);
}

// 异步数据处理
@RequestMapping("getDataAsync")
public String getDataAsync(String path) throws Exception{
    // 1 先去查询版本信息
    Stat stat = zooKeeper.exists(path, false);
    zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
            System.out.println("异步处理回调数据");
            System.out.println("收到的数据:"+new String(data));
            System.out.println("ctx = " + ctx);
        }
    }, "测试数据");

    return "异步获取数据";
}


// 获取子节点列表
@RequestMapping("getChildren")
public List getChildren(String path) throws Exception{
    List children = zooKeeper.getChildren(path, false);
    return children;
}

判断节点是否存在

@RequestMapping("exists")
public  boolean exists(String path) throws Exception{
    Stat stat = zooKeeper.exists(path, false);
    return stat != null ;
}

删除节点

@RequestMapping("delete")
public boolean delete(String path) throws Exception{
    Stat stat = zooKeeper.exists(path, false);
    if (stat != null){
        zooKeeper.delete(path,stat.getVersion());
    }
    return true;
}

更新数据

@RequestMapping("update")
public  boolean update(String path,String data) throws Exception{
    Stat stat = zooKeeper.exists(path, false);
    if (stat != null){
        zooKeeper.setData(path, data.getBytes(), stat.getVersion());
    }
    return true;
}

事件处理

绑定一次事件:

@RequestMapping("addWatch1")
public String addWatch1(String path) throws Exception{
    Stat stat = zooKeeper.exists(path, false);
    // 定义一个监视器对象
    Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) { // 数据改变事件,而且还是一次性
            System.out.println("事件类型:" + event.getType());
            System.out.println("数据发生改变, 请及时更新");
            try {
                byte[] data = zooKeeper.getData(path, this, stat);
                System.out.println("更新后的数据:" + new String(data));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    zooKeeper.getData(path, watcher, stat);
    return "success";
}

绑定永久事件:

@RequestMapping("addWatch2")
public String addWatch2(String path) throws Exception{
    Stat stat = zooKeeper.exists(path, false);
    // 只是获取数据, 没有绑定事件
    byte[] data = zooKeeper.getData(path, null, stat);
    System.out.println("获取到数据:" + new String(data));
    // 绑定永久的事件  --> 1 数据变化事件  2  子节点改变事件
    zooKeeper.addWatch(path, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("event = " + event);
            // 数据改变事件
            if (event.getType() == Event.EventType.NodeDataChanged){
                try {
                    // 重新获取数据
                    Stat stat = zooKeeper.exists(path, false);
                    // 只是获取数据, 没有绑定事件
                    byte[] data = zooKeeper.getData(path, null, stat);
                    System.out.println("更新的数据:" + new String(data));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            // 子节点改变事件
            } else if (event.getType() == Event.EventType.NodeChildrenChanged){
                // 重新获取子节点列表
                System.out.println("子节点数据发生改变");
            }
        }
    }, AddWatchMode.PERSISTENT);
    return "success";
}

递归绑定事件:对于创建的节点以及子节点都绑定事件

@RequestMapping("addWatch")
public List addWatch(String path) throws Exception{
    // 1 先获取所有的子节点
    List children = zooKeeper.getChildren(path, false);
    // 2 绑定一个监听事件
    zooKeeper.addWatch(path, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeChildrenChanged){
                System.out.println("子节点数据发送改变");
                System.out.println("重新获取子节点数据");
                try {
                    List children1 = zooKeeper.getChildren(path, false);
                    System.out.println("children1 = " + children1);
                    System.out.println("=========================");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else if (event.getType() == Event.EventType.NodeDataChanged){
                System.out.println("节点数据发生改变");
                try {
                    byte[] data = zooKeeper.getData(path, false, new Stat());
                    System.out.println("获取到的数据是:" + new String(data));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }, AddWatchMode.PERSISTENT_RECURSIVE);
    return children;
}
服务器的动态感知

服务注册

创建一个 SpringBoot 项目

  1. 导入对应的依赖包

    org.apache.zookeeper
    zookeeper
    3.6.0

  1. 实例化一个 Zookeeper 的客户端连接

  2. 在连接成功以后,开始创建一个对应的临时顺序节点,注册自己的 ip 和端口

启动项目,连接zk,并且注册地址和端口信息

  • 配置文件
server.port=8888
server.host=192.168.48.1
  • 实现代码
// 注册服务
@SpringBootApplication
public class ZookeeperSeckillServerApplication{

    private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
    private static int SESSION_TIMEOUT = 30000;
    private static String PATH = "/servers";
    private static String SUB_PATH = "/seckillServer";
    
    @Value("${server.host}")
    private String host;
    @Value("${server.port}")
    private String port;
    
    private ZooKeeper zooKeeper;

    public static void main(String[] args) {
        SpringApplication.run(ZookeeperSeckillServerApplication.class, args);
    }

    @Bean
    public ZooKeeper zooKeeper() throws  Exception {
    	  // 参数1: 连接地址和端口, 参数2: 会话超过事件, 参数3:事件监听程序
          zooKeeper = new ZooKeeper(SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("event = " + event);
                if(event.getState() == Event.KeeperState.SyncConnected){
                    System.out.println("zookeeper客户端连接成功");
                    //注册对应的信息
                    try {
                        zooKeeper.create(PATH + SUB_PATH, (host + ":" + port).getBytes(),
                                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        return zooKeeper;
    }
}
服务发现

创建一个 SpringBoot 项目

  1. 导入依赖

    org.apache.zookeeper
    zookeeper
    3.6.0

  1. 创建一个 Zookeeper 客户端连接
  2. 连接成功以后,获取地址列表
  3. 绑定子节点改变事件(每次改变获取最新的的服务地址)

启动项目,连接 zk,并且获取服务地址列表

注册永久的事件监听:

@SpringBootApplication
public class ZookeeperOrderServerApplication {

    private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";
    private static int SESSION_TIMEOUT = 30000;
    private static String PATH = "/servers";

    public static List addrList;

    // volatile: 保证在多线程之间的变量的可见性
    private volatile ZooKeeper zooKeeper;

    public static void main(String[] args) {
        SpringApplication.run(ZookeeperOrderServerApplication.class, args);
    }
    
    @Bean
    public ZooKeeper zooKeeper() throws Exception{
        // 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序
        zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("event = " + event);
                if(event.getState()== Event.KeeperState.SyncConnected){
                    System.out.println("zookeeper客户端连接成功");
                    try {
                        //1 获取对应的地址列表
                        getData();
                        //2 绑定永久的事件监听
                        zooKeeper.addWatch(PATH, new Watcher() {
                            @Override
                            public void process(WatchedEvent event) {// 开启另外的线程处理
                                try {
                                    getData();
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }, AddWatchMode.PERSISTENT);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
            //获取数据
            private void getData() throws KeeperException, InterruptedException {
                List serverAddr = zooKeeper.getChildren(PATH, null);
                List tempList = new ArrayList<>();
                for (String path : serverAddr) {
                    //获取节点路径数据
                    byte[] data = zooKeeper.getData(PATH + "/" + path, null, new Stat());
                    String addrInfo = new String(data);
                    // 把数据添加到临时列表
                    tempList.add(addrInfo);
                }
                addrList = tempList;
                System.out.println("获取到秒杀服务的最新地址n" + addrList);
            }
        });
        return zooKeeper;
    }
}
分布式锁业务处理

为什么程序中需要锁?

  • 多任务环境:多个任务同时执行,可以是多线程,也可以是多进程
  • 多个任务的资源共享操作:所有的任务都需要对同一资源进行写操作
  • 对资源的访问是互斥的:对于资源的访问,多个任务同时执行,同一时间只能一个任务访问资源,其他的任务处于阻塞状态

锁的基本概念:

  • 竞争锁:任务通过竞争获取锁才能对该资源进行操作
    公平竞争:按照一定的顺序,先来先执行
    非公平竞争:没有顺序,不管先后顺序执行
  • 占有锁:当有一个任务在对资源进行更新时,其他任务都不可以对这个资源进行操作
  • 任务阻塞
  • 释放锁:直到该任务完成更新,释放资源

锁的应用场景:

  • 单机环境(一个虚拟机中)
  • 分布式环境_同名节点
  • 分布式环境_顺序节点
单机环境(一个虚拟机中)

业务实现:

// 订单ID生成器
public class OrderIDGenerator {
    private int count = 0;
    public synchronized String getId(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-dd-mm");
        String format = sdf.format(new Date());
        try {
        	// 模拟网络延迟
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return format + "-" + (++count);
    }
}

测试实现:

public class OrderService implements Runnable {
    private OrderIDGenerator orderIDGenerator = null;
    private static CountDownLatch countDownLatch = new CountDownLatch(50);
    private static Set result = new HashSet<>(50);

    public OrderService(OrderIDGenerator orderIDGenerator) {
        this.orderIDGenerator = orderIDGenerator;
    }

    public static void main(String[] args) throws Exception{
        OrderIDGenerator idGenerator = new OrderIDGenerator();
        
        System.out.println("开始模拟多线程生成订单号");
        for(int i = 0; i < 50; i++){
            new Thread(new OrderService(idGenerator)).start();
        }
        countDownLatch.await();
        System.out.println("生成的订单号个数:" + result.size());
        System.out.println("======================");
        for (String order : result) {
            System.out.println(order);
        }
    }

    @Override
    public void run() {
        result.add(orderIDGenerator.getId());
        countDownLatch.countDown();
    }
}

分布式环境_同名节点


分布式锁业务流程分析:

分布式锁流程图:

业务实现:

@RestController
public class OrderController {

    private  RestTemplate restTemplate = new RestTemplate();

    @Autowired
    private ZooKeeper zooKeeper;

    private String path = "/locks";
    private String node = "/orderIdLock";

    @RequestMapping("createOrder")
    public String createOrder() throws Exception{
        // 获取id
        if (tryLock()) {
            // 调用业务方法
            String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
            System.out.println("获取到的id:" + id);
            // 释放锁
            unlock();
        } else {
            waitLock();
        }
        return "success";
    }
    
    // 竞争锁资源: 尝试获取id, 如果获取到了, 返回true, 否则返回false
    public boolean tryLock(){
        try {
            // 因为不是顺序节点, 对于同一个路径, 只能创建一次
            String path = zooKeeper.create(this.path + this.node,
                    null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }
    
    // 释放锁资源
    public void unlock(){
        try {
            Stat stat = zooKeeper.exists(this.path + this.node, false);
            zooKeeper.delete(this.path + this.node, stat.getVersion());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    // 阻塞状态
    public  void waitLock(){
        try {
            zooKeeper.getChildren(this.path, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    try {
                        createOrder(); // 重新创建订单
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
分布式环境_顺序节点

分布式锁流程:

@RestController
public class Order02Controller {

    private RestTemplate restTemplate = new RestTemplate();

    @Autowired
    private ZooKeeper zooKeeper;

    private String path = "/locks02";
    private String node = "/orderIdLock";

    @RequestMapping("createOrder02")
    public String createOrder() throws Exception {
        String currentPath = zooKeeper.create(this.path + this.node,
                null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        currentPath = currentPath.substring(currentPath.lastIndexOf("/") + 1);
        //获取id
        if (tryLock(currentPath)) {
            // 调用业务方法
            String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
            System.out.println("获取到的id:" + id);
            // 释放锁
            unlock(currentPath);
        } else {
            waitLock(currentPath);
        }
        return "success";
    }

    //尝试获取id, 如果获取到了, 返回true, 否则返回false
    //竞争锁资源
    public boolean tryLock(String currentPath) {
        try {
            //获取到所有的节点
            List children = zooKeeper.getChildren(this.path, false);
            Collections.sort(children);
            if (StringUtils.pathEquals(currentPath, children.get(0))) {
                return true;
            } else {
                return false;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    //释放锁资源
    public void unlock(String currentPath) {
        try {
            Stat stat = zooKeeper.exists(this.path + "/" + currentPath, false);
            zooKeeper.delete(this.path + "/" + currentPath, stat.getVersion());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //阻塞状态
    public void waitLock(String currentPath) {
        try {
            CountDownLatch count = new CountDownLatch(1);
            List children = zooKeeper.getChildren(this.path, false);
            //获取到前一个节点
            Collections.sort(children);
            int index = children.indexOf(currentPath);
            if (index > 0) {
                String preNode = children.get(index - 1);
                //前一个节点删除操作
                zooKeeper.getData(this.path + "/" + preNode, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeDeleted) {
                            try {
                                String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);
                                System.out.println("获取到的id:" + id);
                                //释放锁
                                unlock(currentPath);
                                count.countDown();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }, new Stat());
            }
            count.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281466.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号