栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper : Curator框架Watcher API介绍

ZooKeeper : Curator框架Watcher API介绍

ZooKeeper : Curator框架Watcher API介绍

在之前的博客中,博主已经介绍了Curator框架的Session、Znode以及ACL API:

  • ZooKeeper : Curator框架重试策略和Session API介绍
  • ZooKeeper : Curator框架Znode、ACL API介绍

本篇博客将介绍Curator框架的Watcher API,它简化了Java客户端原生Watcher API的使用,了解后者可以更好地理解前者的实现:

  • ZooKeeper :Java客户端Watcher API介绍

博主使用的Curator框架版本是5.2.0,ZooKeeper版本是3.6.3。

        
            org.apache.curator
            curator-recipes
            5.2.0
        


5.2.0版本的Curator使用3.6.3版本的ZooKeeper,因此它们是兼容的。

Watcher

Watcher是ZooKeeper中非常重要的特性, ZooKeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于ZooKeeper实现分布式锁、集群管理等功能。

Watcher特性:比如当节点数据发生变化的时候,ZooKeeper会产生一个Watcher事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。ZooKeeper的Watcher机制,可以分为三个过程:客户端注册Watcher、服务端处理Watcher和客户端回调。

一次性Watcher

在Java客户端原生API中,调用getData、getChildren以及exists这三个方法都可以在节点上留下一次性Watcher,而这些Watcher的类型分别是Data、Children和Data,这些一次性Watcher类型在WatcherType枚举类中定义。

    @Public
    public static enum WatcherType {
        Children(1),
        Data(2),
        Any(3);

        private final int intValue;

        private WatcherType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return this.intValue;
        }

        public static Watcher.WatcherType fromInt(int intValue) {
            switch(intValue) {
            case 1:
                return Children;
            case 2:
                return Data;
            case 3:
                return Any;
            default:
                throw new RuntimeException("Invalid integer value for conversion to WatcherType");
            }
        }
    }

Java客户端原生API中getData、getChildren以及exists三个方法的一次性Watcher触发条件:

  • getData:一次性Watcher将由在节点上设置数据或删除节点的成功操作触发。
  • exists:一次性Watcher将由创建、删除节点或在节点上设置数据的成功操作触发。
  • getChildren:删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发一次性Watcher。

测试类Application(实现了CuratorWatcher接口,因此Application的实例有Watcher的功能 ):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;



public class Application implements CuratorWatcher {

    private static final String SERVER_PROXY = "192.168.31.172:9000";
    private static final int CONNECTION_TIMEOUT_MS = 40000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final String NAMESPACE = "MyNamespace";

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        Curatorframework curator = CuratorframeworkFactory.builder()
                .connectString(SERVER_PROXY)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(SESSION_TIMEOUT_MS)
                .namespace(NAMESPACE)
                .build();

        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);

        Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());
                
        Thread.sleep(10000000);
    }

    @Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
    }
}

输出:

/father/son/grandson1
NodeCreated

Curator框架的checkExists方法对应于Java客户端的exists方法。对节点进行了两次操作(创建和修改节点数据),但Watcher只触发了一次。如果想要多次监听节点事件就需要多次添加这种一次性Watcher:

        Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");

输出:

/father/son/grandson1
NodeCreated
/father/son/grandson1
NodeDataChanged
/father/son/grandson1
NodeDeleted

很显然符合预期,想要多次监听事件,就必须多次添加一次性Watcher,而上面的程序由于没有再次添加一次性Watcher,节点的NodeDataChanged事件就不能监听,因为一次性Watcher在节点的NodeCreated事件触发时就被Zookeeper服务端移除了。Curator框架的getData方法对应于Java客户端的getData方法,它们都只能在节点存在的情况下给节点留下一次性Watcher,因此它们留下的一次性Watcher肯定不能监听该节点的创建事件,其他事件的监听与checkExists方法的一次性Watcher类似,这里不再赘述。

        Application watcher = new Application();

        curator.getData()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());


Curator框架的getChildren方法对应于Java客户端的getChildren方法,给指定节点留下一次性Watcher的方式也是类似的,都是通过usingWatcher方法,删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发这个一次性Watcher(临时节点不能创建子节点)。

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1/test");

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");

输出:

/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeDeleted

在Curator框架中可以使用两种Watcher,Java客户端提供的Watcher接口和Curator框架提供的CuratorWatcher接口。而它们的方法定义是类似的:

package org.apache.curator.framework.api;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public interface CuratorWatcher
{
    public void process(WatchedEvent event) throws Exception;
}
package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience.Public;

@Public
public interface Watcher {
    void process(WatchedEvent var1);
    ...
}

在Curator框架中这两种Watcher的实现都可以使用。

    T usingWatcher(Watcher watcher);
    T usingWatcher(CuratorWatcher watcher);

当需要一直监听指定节点的事件时,一次性Watcher就不太方便了,这时就需要持久Watcher。

持久Watcher

在Java客户端原生API中,调用addWatch方法可以在节点上添加持久Watcher(PERSISTENT和PERSISTENT_RECURSIVE),并且这些Watcher的类型是Any。在Curator框架中,可以如下所示给节点添加持久Watcher(通过.watchers().add()链式调用):

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1/test", "new test".getBytes());

输出:

/father/son/grandson1
NodeChildrenChanged

AddWatchMode.PERSISTENT类型的Watcher只会监听注册节点的相关事件(节点数据更改NodeDataChanged、子节点列表更改NodeChildrenChanged以及节点删除NodeDeleted等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。如果将AddWatchMode.PERSISTENT换成AddWatchMode.PERSISTENT_RECURSIVE:

        curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT_RECURSIVE)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

输出:

/father/son/grandson1/test
NodeCreated
/father/son/grandson1/test
NodeDataChanged

AddWatchMode.PERSISTENT_RECURSIVE类型的Watcher不仅监听注册节点的相关事件(节点数据更改NodeDataChanged和节点删除NodeDeleted等,而子节点列表更改NodeChildrenChanged其实就是子节点的节点创建NodeCreated和节点删除NodeDeleted,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。

AddWatchMode枚举类:

删除Watcher

删除类型为Any的Watcher,也会一起删除类型为Children和Data的Watcher,而删除类型为Children或Data的Watcher,只能删除对应类型的Watcher。

删除指定节点类型为Data的所有Watcher:

        Application watcher1 = new Application();
        Application watcher2 = new Application();

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.getChildren()
                .usingWatcher(watcher1)
                .forPath("/father/son/grandson1");

        curator.getData()
                .usingWatcher(watcher2)
                .forPath("/father/son/grandson1");

        curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Data)
                .forPath("/father/son/grandson1"); 
    @Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
        System.out.println(this);
    }

输出:

/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05

删除指定节点类型为Any的所有Watcher:

        curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");

输出:

/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@61613ed9
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05

删除指定节点类型为Any的指定Watcher:

        curator.watchers()
                .remove(watcher1)
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");

输出:

/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@5ebb066a

这些输出都符合预期。Curator框架Watcher API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650249.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号