栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper : Curator框架之数据缓存与监听CuratorCache

ZooKeeper : Curator框架之数据缓存与监听CuratorCache

CuratorCache

CuratorCache会试图将来自节点的数据保存在本地缓存中。 可以缓存指定的单个节点,也可以缓存以指定节点为根的整个子树(默认缓存方案)。可以给CuratorCache实例注册监听器,当相关节点发生更改时会接收到通知, 将响应更新、创建、删除等事件。

很多基于ZooKeeper的框架,就是使用CuratorCache来缓存相关节点的数据,比如ElasticJob,这样大部分节点的数据可以通过内存来获取,提高了性能,并且可以很方便地监听相关节点的事件。

CuratorCache接口源码:

public interface CuratorCache extends Closeable, CuratorCacheAccessor
{
    
    enum Options
    {
        
        SINGLE_NODE_CACHE,

        
        COMPRESSED_DATA,

        
        DO_NOT_CLEAR_ON_CLOSE
    }

    
    static CuratorCache build(Curatorframework client, String path, Options... options)
    {
        return builder(client, path).withOptions(options).build();
    }

    
    static CuratorCacheBuilder builder(Curatorframework client, String path)
    {
        return new CuratorCacheBuilderImpl(client, path);
    }

    
    static CuratorCacheBridgeBuilder bridgeBuilder(Curatorframework client, String path)
    {
        return new CuratorCacheBridgeBuilderImpl(client, path);
    }

    
    void start();

    
    @Override
    void close();

    
    Listenable listenable();

    
    @Override
    Optional get(String path);

    
    @Override
    int size();

    
    @Override
    Stream stream();
}

CuratorCache接口通过build静态方法返回的CuratorCache实例是其实现类的实例(最终会调用CuratorCacheBuilderImpl类的build方法):

    @Override
    public CuratorCache build()
    {
        return new CuratorCacheImpl(client, storage, path, options, exceptionHandler);
    }

而只需要使用CuratorCache接口即可(创建的实例就是CuratorCacheImpl类的实例),而CuratorCacheImpl类中可用的方法基本上是实现CuratorCache接口中的方法。

CuratorCacheListener

CuratorCacheListener是CuratorCache事件的监听器。

CuratorCacheListener接口源码(函数式接口):

@FunctionalInterface
public interface CuratorCacheListener
{
    
    enum Type
    {
        
        NODE_CREATED,

        
        NODE_CHANGED,

        
        NODE_DELETED
    }

    
    void event(Type type, ChildData oldData, ChildData data);

    
    default void initialized()
    {
        // NOP
    }

    
    static CuratorCacheListenerBuilder builder()
    {
        return new CuratorCacheListenerBuilderImpl();
    }
}

通过CuratorCacheListenerBuilderImpl的实例可以给CuratorCache添加各种监听器。

演示 SINGLE_NODE_CACHE(单节点)
package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import java.util.Optional;

public class Application {

    private static final String SERVER_PROXY = "192.168.31.175:9000";
    private static final int CONNECTION_TIMEOUT_MS = 40000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final String NAMESPACE = "MyNamespace";

    public static void main(String[] args) throws Exception {
        // 重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 创建Curatorframework实例
        Curatorframework curator = CuratorframeworkFactory.builder()
                .connectString(SERVER_PROXY)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(SESSION_TIMEOUT_MS)
                .namespace(NAMESPACE)
                .build();

        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);
        curator.blockUntilConnected();

        if(curator.checkExists().forPath("/father") != null) {
            curator.delete().deletingChildrenIfNeeded().forPath("/father");
        }
        
        // 创建CuratorCache实例,基于路径/father/son/grandson1(这里说的路径都是基于命名空间下的路径)
        // 缓存构建选项是SINGLE_NODE_CACHE
        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        // 创建一系列CuratorCache监听器,都是通过lambda表达式指定
        CuratorCacheListener listener = CuratorCacheListener.builder()
                // 初始化完成时调用
                .forInitialized(() -> System.out.println("[forInitialized] : Cache initialized"))
                // 添加或更改缓存中的数据时调用
                .forCreatesAndChanges(
                        (oldNode, node) -> System.out.printf("[forCreatesAndChanges] : Node changed: Old: [%s] New: [%s]n",
                                oldNode, node)
                )
                // 添加缓存中的数据时调用
                .forCreates(childData -> System.out.printf("[forCreates] : Node created: [%s]n", childData))
                // 更改缓存中的数据时调用
                .forChanges(
                        (oldNode, node) -> System.out.printf("[forChanges] : Node changed: Old: [%s] New: [%s]n",
                                oldNode, node)
                )
                // 删除缓存中的数据时调用
                .forDeletes(childData -> System.out.printf("[forDeletes] : Node deleted: data: [%s]n", childData))
                // 添加、更改或删除缓存中的数据时调用
                .forAll((type, oldData, data) -> System.out.printf("[forAll] : type: [%s] [%s] [%s]n", type, oldData, data))
                .build();

        // 给CuratorCache实例添加监听器
        cache.listenable().addListener(listener);

        // 启动CuratorCache
        cache.start();

        // 创建节点/father/son/grandson1
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());
        
        // 创建节点/father/son/grandson1/test
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test", "test".getBytes());
        
        // 创建节点/father/son/grandson1/test/test2
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test/test2", "test2".getBytes());
       
        // 更改节点/father/son/grandson1的数据
        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());

        // 更改节点/father/son/grandson1/test的数据
        curator.setData()
                .forPath("/father/son/grandson1/test", "new test".getBytes());

        // 删除节点/father/son/grandson1
        curator.delete()
                .deletingChildrenIfNeeded()
                .forPath("/father/son/grandson1");

        Thread.sleep(10000000);
    }
}

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198306,198306,1640848445204,1640848445204,0,1,0,0,4,1,198307
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198306,198309,1640848445204,1640848445218,1,1,0,0,8,1,198307
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然/father/son/grandson1的子节点并没有被缓存(因为缓存构建选项是SINGLE_NODE_CACHE),因此子节点的相关操作并没有被监听。

默认(子树)

如果使用默认选项构建缓存,即:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
[forInitialized] : Cache initialized

// 创建节点/father/son/grandson1,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}]

// 创建节点/father/son/grandson1/test,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,0,0,0,4,0,198325
, data=[116, 101, 115, 116]}]

// 创建节点/father/son/grandson1/test/test2,即添加缓存数据
[forCreatesAndChanges] : Node changed: Old: [null] New: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forCreates] : Node created: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_CREATED] [null] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]

// 更改节点/father/son/grandson1,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] New: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1', stat=198324,198324,1640854493222,1640854493222,0,1,0,0,4,1,198325
, data=[100, 97, 116, 97]}] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]

// 更改节点/father/son/grandson1/test,即更改缓存数据
[forCreatesAndChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forChanges] : Node changed: Old: [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] New: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_CHANGED] [ChildData{path='/father/son/grandson1/test', stat=198325,198325,1640854493226,1640854493226,0,1,0,0,4,1,198326
, data=[116, 101, 115, 116]}] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]

// 删除节点/father/son/grandson1/test/test2,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test/test2', stat=198326,198326,1640854493230,1640854493230,0,0,0,0,5,0,198326
, data=[116, 101, 115, 116, 50]}] [null]

// 删除节点/father/son/grandson1/test,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1/test', stat=198325,198328,1640854493226,1640854493240,1,1,0,0,8,1,198326
, data=[110, 101, 119, 32, 116, 101, 115, 116]}] [null]

// 删除节点/father/son/grandson1,即删除缓存数据
[forDeletes] : Node deleted: data: [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}]
[forAll] : type: [NODE_DELETED] [ChildData{path='/father/son/grandson1', stat=198324,198327,1640854493222,1640854493237,1,1,0,0,8,1,198325
, data=[110, 101, 119, 32, 100, 97, 116, 97]}] [null]

很显然/father/son/grandson1的子节点也被缓存了,因此子节点的相关操作可以被监听。

NodeCacheListener

是一种桥接监听器,可以将旧式监听器NodeCacheListener(因为NodeCache类已经标记@Deprecated注解)与CuratorCache重用。

    @Override
    public CuratorCacheListenerBuilder forNodeCache(NodeCacheListener listener)
    {
        listeners.add(new NodeCacheListenerWrapper(listener));
        return this;
    }

通过lambda表达式指定NodeCacheListener实例,该实例不接收任何参数。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forNodeCache(() -> System.out.println("forNodeCache"))
                .build();

输出:

// 创建节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");
// 创建节点/father/son/grandson1
forNodeCache
// 创建节点/father/son/grandson1/test
forNodeCache
// 创建节点/father/son/grandson1/test/test2
forNodeCache
// 更改节点/father/son/grandson1
forNodeCache
// 更改节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1/test/test2
forNodeCache
// 删除节点/father/son/grandson1/test
forNodeCache
// 删除节点/father/son/grandson1
forNodeCache

很显然该监听器监听缓存中的所有节点(不需要知道这些节点发生事件的数据)。

PathChildrenCacheListener

是一种桥接监听器,可以将旧式监听器PathChildrenCacheListener(因为PathChildrenCache类已经标记@Deprecated注解)与CuratorCache重用。该监听器需要指定根路径,以便仅为此路径的子级而不是路径本身桥接事件。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forPathChildrenCache("/father/son/grandson1", curator, (client, event) -> {
                    System.out.println(event);
                })
                .build();

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
PathChildrenCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198507,1640856689919,1640856689919,0,0,0,0,4,0,198507
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198508,198508,1640856689922,1640856689922,0,0,0,0,5,0,198508
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
PathChildrenCacheEvent{type=CHILD_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198507,198510,1640856689919,1640856689932,1,1,0,0,8,1,198508
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

很显然该监听器监听缓存中指定根节点的所有子节点,并且不包括根节点。

TreeCacheListener

是一种桥接监听器,可以将旧式监听器TreeCacheListener(因为TreeCache类已经标记@Deprecated注解)与CuratorCache重用。

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1",
                CuratorCache.Options.SINGLE_NODE_CACHE);

        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forTreeCache(curator, (client, event) -> {
                    System.out.println(event);
                })
                .build();

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198523,198523,1640857107063,1640857107063,0,1,0,0,4,1,198524
, data=[100, 97, 116, 97]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198523,198526,1640857107063,1640857107072,1,1,0,0,8,1,198524
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

使用默认选项构建缓存:

        CuratorCache cache = CuratorCache.build(curator, "/father/son/grandson1");

输出:

// 初始化完成
TreeCacheEvent{type=INITIALIZED, data=null}

// 创建节点/father/son/grandson1
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1', stat=198540,198540,1640857267893,1640857267893,0,1,0,0,4,1,198541
, data=[100, 97, 116, 97]}}

// 创建节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198541,1640857267896,1640857267896,0,0,0,0,4,0,198541
, data=[116, 101, 115, 116]}}

// 创建节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 更改节点/father/son/grandson1
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

// 更改节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1/test/test2
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test/test2', stat=198542,198542,1640857267899,1640857267899,0,0,0,0,5,0,198542
, data=[116, 101, 115, 116, 50]}}

// 删除节点/father/son/grandson1/test
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1/test', stat=198541,198544,1640857267896,1640857267910,1,1,0,0,8,1,198542
, data=[110, 101, 119, 32, 116, 101, 115, 116]}}

// 删除节点/father/son/grandson1
TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/father/son/grandson1', stat=198540,198543,1640857267893,1640857267907,1,1,0,0,8,1,198541
, data=[110, 101, 119, 32, 100, 97, 116, 97]}}

很显然该监听器监听缓存中的所有节点(需要知道这些节点发生事件的数据)。

获取缓存数据
        if(curator.checkExists().forPath("/") != null) {
            curator.delete().deletingChildrenIfNeeded().forPath("/");
        }

        CuratorCache cache = CuratorCache.build(curator, "/");

        cache.start();
    
        // 创建一系列节点
        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1/test/test2", "test2".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/kaven", "kaven".getBytes());

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/jojo", "jojo".getBytes());

        // 获取指定路径的节点数据
        Optional childData = cache.get("/father/son/grandson1");
        if(childData.isPresent()) System.out.println(childData);

        // 使用stream将缓存的节点按数据值(字符串)从小到大,如果数据值相等按路径(字符串)从小到大排序
        // 收集为List类型
        List childDataList = cache.stream()
                .sorted((childData1, childData2) -> {
                    int dataCompare = new String(childData1.getData()).compareTo(new String(childData2.getData()));
                    if(dataCompare != 0) return dataCompare;
                    else return childData1.getPath().compareTo(childData2.getPath());
                })
                .collect(Collectors.toList());

        System.out.println(childDataList.size());
        // 遍历在缓存中获得的排序后的节点数据
        childDataList.forEach(child -> {
            System.out.print(child.getPath() + " : ");
            if(child.getData().length != 0) System.out.println(new String(child.getData()));
            // 如果节点没有存储数据,输出[blank]
            else System.out.println("[blank]");
        });

输出:

Optional[ChildData{path='/father/son/grandson1', stat=198753,198753,1640920752605,1640920752605,0,1,0,0,4,1,198754
, data=[100, 97, 116, 97]}]
8
/ : [blank]
/father : [blank]
/father/son : [blank]
/father/son/grandson1 : data
/jojo : jojo
/kaven : kaven
/father/son/grandson1/test : test
/father/son/grandson1/test/test2 : test2

输出符合预期。

Curator框架的数据缓存与监听CuratorCache就介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696361.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号