栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ZooKeeper : Curator框架之共享计数器DistributedAtomicLong

ZooKeeper : Curator框架之共享计数器DistributedAtomicLong

DistributedAtomicLong

原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。

各种增量方法都会返回一个AtomicValue实例,通过调用AtomicValue实例的succeeded()可以查询增量操作是否执行成功,除了get() 外,其他任何方法都不保证一定成功。

AtomicValue接口源码(原子操作返回值的抽象):

public interface AtomicValue
{
    
    public boolean      succeeded();

    
    public T            prevalue();

    
    public T            postValue();

    
    public AtomicStats  getStats();
}

DistributedAtomicLong类中的内部类AtomicLong实现了AtomicValue接口,但实际上只是起到封装的作用,所有的调用都委托给了bytes属性(其他实现类的实例)。

    private class AtomicLong implements AtomicValue
    {
        private AtomicValue bytes;

        private AtomicLong(AtomicValue bytes)
        {
            this.bytes = bytes;
        }

        @Override
        public boolean succeeded()
        {
            return bytes.succeeded();
        }

        @Override
        public Long prevalue()
        {
            return bytesToValue(bytes.prevalue());
        }

        @Override
        public Long postValue()
        {
            return bytesToValue(bytes.postValue());
        }

        @Override
        public AtomicStats getStats()
        {
            return bytes.getStats();
        }
    }

DistributedAtomicLong类实现了DistributedAtomicNumber接口,并且DistributedAtomicLong将各种原子操作的执行委托给了DistributedAtomicValue。

public class DistributedAtomicLong implements DistributedAtomicNumber
{
    private final DistributedAtomicValue        value;
    ...
}

DistributedAtomicNumber接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。

public interface DistributedAtomicNumber
{
    public AtomicValue get() throws Exception;
    public AtomicValue compareAndSet(T expectedValue, T newValue) throws Exception;
    public AtomicValue trySet(T newValue) throws Exception;
    public boolean initialize(T value) throws Exception;
    public void forceSet(T newValue) throws Exception;
    public AtomicValue increment() throws Exception;
    public AtomicValue decrement() throws Exception;
    public AtomicValue add(T delta) throws Exception;
    public AtomicValue subtract(T delta) throws Exception;
}

目前DistributedAtomicNumber接口有两种实现,除了DistributedAtomicLong类,还有DistributedAtomicInteger类。

并且DistributedAtomicInteger也是将各种原子操作的执行委托给了DistributedAtomicValue,所以这两种实现是类似的,只不过表示的数值类型不同而已。

public class DistributedAtomicInteger implements DistributedAtomicNumber
{
    private final DistributedAtomicValue        value;
    ...
}

DistributedAtomicValue是原子操作真正的执行者,因此可以知道内部类AtomicLong的bytes属性是MutableAtomicValue实例。

    public AtomicValue     get() throws Exception
    {
        MutableAtomicValue  result = new MutableAtomicValue(null, null, false);
        ...
        return result;
    }
测试

pom.xml:



    4.0.0

    com.kaven
    zookeeper
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            org.apache.curator
            curator-recipes
            5.2.0
        
        
            org.projectlombok
            lombok
            1.18.22
        
    

CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorframeworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static Curatorframework getCuratorframework() {
        // 创建Curatorframework实例
        Curatorframework curator = CuratorframeworkFactory.builder()
                .connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorframeworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorframeworkState.STARTED);
        return curator;
    }
}

DistributedAtomicLongRunnable类(实现了Runnable接口,模拟分布式节点操作分布式原子长整型):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;

public class DistributedAtomicLongRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的Curatorframework实例,表示不同的分布式节点
        Curatorframework curator = CuratorframeworkProperties.getCuratorframework();

        // 共享计数器的路径
        String counterPath = "/kaven";

        // 创建DistributedAtomicLong实例,用于操作分布式原子长整型
        // new RetryNTimes(100, 5)是乐观锁的重试策略实例
        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        // 初始化
        boolean initialize = atomicLong.initialize(100L);
        if(initialize) {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
        }

        // 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
        AtomicValue longAtomicValue = atomicLong.compareAndSet(100L, 501L);
        if(longAtomicValue.succeeded()) {
            System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
        }
    }
}

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
        }
    }
}

模拟15个分布式节点操作分布式原子长整型,输出如下所示:

pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-8 compareAndSet 失败
pool-1-thread-14 compareAndSet 失败
pool-1-thread-10 compareAndSet 失败
pool-1-thread-6 compareAndSet 失败
pool-1-thread-15 compareAndSet 失败
pool-1-thread-13 compareAndSet 失败
pool-1-thread-7 compareAndSet 失败
pool-1-thread-9 compareAndSet 失败
pool-1-thread-11 compareAndSet 失败
pool-1-thread-5 compareAndSet 失败
pool-1-thread-12 compareAndSet 失败
pool-1-thread-1 compareAndSet 失败
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失败
pool-1-thread-2 compareAndSet 失败

输出是符合预期的,两种操作都只有一个节点执行成功。DistributedAtomicValue类的initialize和compareAndSet方法如下所示,其实就是创建Zookeeper节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。

    public boolean initialize(byte[] value) throws Exception
    {
        try
        {
            client.create().creatingParentContainersIfNeeded().forPath(path, value);
        }
        catch ( KeeperException.NodeExistsException ignore )
        {
            // ignore
            return false;
        }
        return true;
    }
    
    public AtomicValue compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
    {
        Stat                        stat = new Stat();
        MutableAtomicValue  result = new MutableAtomicValue(null, null, false);
        boolean                     createIt = getCurrentValue(result, stat);
        if ( !createIt && Arrays.equals(expectedValue, result.prevalue) )
        {
            try
            {
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
                result.succeeded = true;
                result.postValue = newValue;
            }
            catch ( KeeperException.BadVersionException dummy )
            {
                result.succeeded = false;
            }
            catch ( KeeperException.NoNodeException dummy )
            {
                result.succeeded = false;
            }
        }
        else
        {
            result.succeeded = false;
        }
        return result;
    }

increment、decrement、add以及subtract这四种操作是类似的,博主只演示increment操作。

        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        boolean initialize = atomicLong.initialize(100L);
        if(initialize) {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
        }

        for (int i = 0; i < 1000; i++) {
            Thread.sleep(5);
            atomicLong.increment();
        }
        System.out.println(Thread.currentThread().getName() + "操作成功");
        System.out.println(Thread.currentThread().getName() + "当前的值为" + atomicLong.get().postValue());

输出如下所示:

pool-1-thread-8初始化 atomicLong 失败
pool-1-thread-1初始化 atomicLong 失败
pool-1-thread-3初始化 atomicLong 失败
pool-1-thread-14初始化 atomicLong 失败
pool-1-thread-5初始化 atomicLong 失败
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-2初始化 atomicLong 失败
pool-1-thread-4初始化 atomicLong 失败
pool-1-thread-15初始化 atomicLong 失败
pool-1-thread-13初始化 atomicLong 失败
pool-1-thread-11初始化 atomicLong 失败
pool-1-thread-9初始化 atomicLong 失败
pool-1-thread-7初始化 atomicLong 失败
pool-1-thread-6初始化 atomicLong 失败
pool-1-thread-10初始化 atomicLong 失败
pool-1-thread-15操作成功
pool-1-thread-15当前的值为14289
pool-1-thread-3操作成功
pool-1-thread-3当前的值为14305
pool-1-thread-13操作成功
pool-1-thread-13当前的值为14420
pool-1-thread-2操作成功
pool-1-thread-2当前的值为14681
pool-1-thread-4操作成功
pool-1-thread-4当前的值为14876
pool-1-thread-1操作成功
pool-1-thread-1当前的值为14906
pool-1-thread-5操作成功
pool-1-thread-5当前的值为14953
pool-1-thread-8操作成功
pool-1-thread-8当前的值为14972
pool-1-thread-14操作成功
pool-1-thread-14当前的值为15001
pool-1-thread-7操作成功
pool-1-thread-7当前的值为15020
pool-1-thread-10操作成功
pool-1-thread-10当前的值为15051
pool-1-thread-11操作成功
pool-1-thread-11当前的值为15053
pool-1-thread-9操作成功
pool-1-thread-9当前的值为15060
pool-1-thread-12操作成功
pool-1-thread-12当前的值为15093
pool-1-thread-6操作成功
pool-1-thread-6当前的值为15100

最后的值为15100符合预期。increment、decrement、add以及subtract这四个方法都调用worker方法来完成。

    @Override
    public AtomicValue    increment() throws Exception
    {
        return worker(1L);
    }

    @Override
    public AtomicValue    decrement() throws Exception
    {
        return worker(-1L);
    }

    @Override
    public AtomicValue    add(Long delta) throws Exception
    {
        return worker(delta);
    }

    @Override
    public AtomicValue subtract(Long delta) throws Exception
    {
        return worker(-1 * delta);
    }

DistributedAtomicLong类的worker方法则是调用DistributedAtomicValue类的trySet方法来完成。

    private AtomicValue   worker(final Long addAmount) throws Exception
    {
        Preconditions.checkNotNull(addAmount, "addAmount cannot be null");

        Makevalue               makevalue = new Makevalue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                long        previousValue = (previous != null) ? bytesToValue(previous) : 0;
                long        newValue = previousValue + addAmount;
                return valueToBytes(newValue);
            }
        };

        AtomicValue     result = value.trySet(makevalue);
        return new AtomicLong(result);
    }

DistributedAtomicValue类的trySet方法尝试以原子方式将计数器的值设置为给定值,首先尝试使用乐观锁进行操作,如果失败,则采用可选的InterProcessMutex(悲观锁)进行操作。

    // 尝试以原子方式将计数器的值设置为给定值
    public AtomicValue   trySet(final byte[] newValue) throws Exception
    {
        MutableAtomicValue  result = new MutableAtomicValue(null, null, false);

        Makevalue                   makevalue = new Makevalue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                return newValue;
            }
        };
        // 尝试使用乐观锁
        tryOptimistic(result, makevalue);
        if ( !result.succeeded() && (mutex != null) )
        {
            // 如果在乐观锁下执行不成功,并且有悲观锁
            // 尝试使用悲观锁
            tryWithMutex(result, makevalue);
        }

        return result;
    }

DistributedAtomicLong类的trySet方法用于尝试设置计数器的值,也是通过调用DistributedAtomicValue类的trySet方法来完成。

    @Override
    public AtomicValue   trySet(Long newValue) throws Exception
    {
        return new AtomicLong(value.trySet(valueToBytes(newValue)));
    }

DistributedAtomicLong类的forceSet方法用于强制设置计数器的值,通过调用DistributedAtomicValue类的forceSet方法来完成。

    @Override
    public void forceSet(Integer newValue) throws Exception
    {
        value.forceSet(valueToBytes(newValue));
    }

DistributedAtomicValue类的forceSet方法如下所示,就是直接设置Zookeeper节点的值。

    
    public void forceSet(byte[] newValue) throws Exception
    {
        try
        {
            client.setData().forPath(path, newValue);
        }
        catch ( KeeperException.NoNodeException dummy )
        {
            try
            {
                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
            }
            catch ( KeeperException.NodeExistsException dummy2 )
            {
                client.setData().forPath(path, newValue);
            }
        }
    }

这些方法比较简单,博主就不演示了。

给DistributedAtomicLong设置悲观锁可以如下所示进行操作:

        PromotedToLock promotedToLock = PromotedToLock.builder()
                 // 用于分布式锁的Zookeeper路径 
                .lockPath("/lock")
                // 锁的重试策略
                .retryPolicy(new RetryNTimes(100, 5))
                // 锁的超时时间
                .timeout(10000, TimeUnit.SECONDS)
                .build();

        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5),
                promotedToLock);

Curator框架的共享计数器DistributedAtomicLong就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711749.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号