原子增量操作的计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex(悲观锁)进行增量操作。 对于乐观锁和悲观锁,重试策略都用于重试增量操作。
各种增量方法都会返回一个AtomicValue实例,通过调用AtomicValue实例的succeeded()可以查询增量操作是否执行成功,除了get() 外,其他任何方法都不保证一定成功。
AtomicValue接口源码(原子操作返回值的抽象):
public interface AtomicValue{ public boolean succeeded(); public T prevalue(); public T postValue(); public AtomicStats getStats(); }
DistributedAtomicLong类中的内部类AtomicLong实现了AtomicValue接口,但实际上只是起到封装的作用,所有的调用都委托给了bytes属性(其他实现类的实例)。
private class AtomicLong implements AtomicValue{ private AtomicValue bytes; private AtomicLong(AtomicValue bytes) { this.bytes = bytes; } @Override public boolean succeeded() { return bytes.succeeded(); } @Override public Long prevalue() { return bytesToValue(bytes.prevalue()); } @Override public Long postValue() { return bytesToValue(bytes.postValue()); } @Override public AtomicStats getStats() { return bytes.getStats(); } }
DistributedAtomicLong类实现了DistributedAtomicNumber接口,并且DistributedAtomicLong将各种原子操作的执行委托给了DistributedAtomicValue。
public class DistributedAtomicLong implements DistributedAtomicNumber{ private final DistributedAtomicValue value; ... }
DistributedAtomicNumber接口是分布式原子数值类型的抽象,定义了分布式原子数值类型需要提供的方法。
public interface DistributedAtomicNumber{ public AtomicValue get() throws Exception; public AtomicValue compareAndSet(T expectedValue, T newValue) throws Exception; public AtomicValue trySet(T newValue) throws Exception; public boolean initialize(T value) throws Exception; public void forceSet(T newValue) throws Exception; public AtomicValue increment() throws Exception; public AtomicValue decrement() throws Exception; public AtomicValue add(T delta) throws Exception; public AtomicValue subtract(T delta) throws Exception; }
目前DistributedAtomicNumber接口有两种实现,除了DistributedAtomicLong类,还有DistributedAtomicInteger类。
并且DistributedAtomicInteger也是将各种原子操作的执行委托给了DistributedAtomicValue,所以这两种实现是类似的,只不过表示的数值类型不同而已。
public class DistributedAtomicInteger implements DistributedAtomicNumber{ private final DistributedAtomicValue value; ... }
DistributedAtomicValue是原子操作真正的执行者,因此可以知道内部类AtomicLong的bytes属性是MutableAtomicValue实例。
public AtomicValue测试get() throws Exception { MutableAtomicValue result = new MutableAtomicValue (null, null, false); ... return result; }
pom.xml:
4.0.0 com.kaven zookeeper 1.0-SNAPSHOT 8 8 org.apache.curator curator-recipes 5.2.0 org.projectlombok lombok 1.18.22
CuratorframeworkProperties类(提供Curatorframework需要的一些配置信息,以及创建Curatorframework实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.CuratorframeworkFactory;
import org.apache.curator.framework.imps.CuratorframeworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorframeworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static Curatorframework getCuratorframework() {
// 创建Curatorframework实例
Curatorframework curator = CuratorframeworkFactory.builder()
.connectString(CuratorframeworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorframeworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorframeworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorframeworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorframeworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorframeworkState.STARTED);
return curator;
}
}
DistributedAtomicLongRunnable类(实现了Runnable接口,模拟分布式节点操作分布式原子长整型):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.Curatorframework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
public class DistributedAtomicLongRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的Curatorframework实例,表示不同的分布式节点
Curatorframework curator = CuratorframeworkProperties.getCuratorframework();
// 共享计数器的路径
String counterPath = "/kaven";
// 创建DistributedAtomicLong实例,用于操作分布式原子长整型
// new RetryNTimes(100, 5)是乐观锁的重试策略实例
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
// 初始化
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}
// 比较再设置,当Zookeeper中的值与期望值相等时才能设置新值
AtomicValue longAtomicValue = atomicLong.compareAndSet(100L, 501L);
if(longAtomicValue.succeeded()) {
System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
}
else {
System.out.println(Thread.currentThread().getName() + " compareAndSet 失败");
}
}
}
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
}
}
}
模拟15个分布式节点操作分布式原子长整型,输出如下所示:
pool-1-thread-12初始化 atomicLong 成功 pool-1-thread-11初始化 atomicLong 失败 pool-1-thread-10初始化 atomicLong 失败 pool-1-thread-14初始化 atomicLong 失败 pool-1-thread-15初始化 atomicLong 失败 pool-1-thread-8初始化 atomicLong 失败 pool-1-thread-13初始化 atomicLong 失败 pool-1-thread-6初始化 atomicLong 失败 pool-1-thread-1初始化 atomicLong 失败 pool-1-thread-7初始化 atomicLong 失败 pool-1-thread-5初始化 atomicLong 失败 pool-1-thread-3初始化 atomicLong 失败 pool-1-thread-9初始化 atomicLong 失败 pool-1-thread-2初始化 atomicLong 失败 pool-1-thread-4初始化 atomicLong 失败 pool-1-thread-8 compareAndSet 失败 pool-1-thread-14 compareAndSet 失败 pool-1-thread-10 compareAndSet 失败 pool-1-thread-6 compareAndSet 失败 pool-1-thread-15 compareAndSet 失败 pool-1-thread-13 compareAndSet 失败 pool-1-thread-7 compareAndSet 失败 pool-1-thread-9 compareAndSet 失败 pool-1-thread-11 compareAndSet 失败 pool-1-thread-5 compareAndSet 失败 pool-1-thread-12 compareAndSet 失败 pool-1-thread-1 compareAndSet 失败 pool-1-thread-3 compareAndSet 成功 pool-1-thread-4 compareAndSet 失败 pool-1-thread-2 compareAndSet 失败
输出是符合预期的,两种操作都只有一个节点执行成功。DistributedAtomicValue类的initialize和compareAndSet方法如下所示,其实就是创建Zookeeper节点(只有一个服务能创建成功)和基于版本设置节点的值(在博主的测试程序中,也只能有一个服务将该操作执行成功),而这两种操作并没有使用锁(乐观锁和悲观锁)。
public boolean initialize(byte[] value) throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, value);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
return false;
}
return true;
}
public AtomicValue compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
{
Stat stat = new Stat();
MutableAtomicValue result = new MutableAtomicValue(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.prevalue) )
{
try
{
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
}
catch ( KeeperException.BadVersionException dummy )
{
result.succeeded = false;
}
catch ( KeeperException.NoNodeException dummy )
{
result.succeeded = false;
}
}
else
{
result.succeeded = false;
}
return result;
}
increment、decrement、add以及subtract这四种操作是类似的,博主只演示increment操作。
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失败");
}
for (int i = 0; i < 1000; i++) {
Thread.sleep(5);
atomicLong.increment();
}
System.out.println(Thread.currentThread().getName() + "操作成功");
System.out.println(Thread.currentThread().getName() + "当前的值为" + atomicLong.get().postValue());
输出如下所示:
pool-1-thread-8初始化 atomicLong 失败 pool-1-thread-1初始化 atomicLong 失败 pool-1-thread-3初始化 atomicLong 失败 pool-1-thread-14初始化 atomicLong 失败 pool-1-thread-5初始化 atomicLong 失败 pool-1-thread-12初始化 atomicLong 成功 pool-1-thread-2初始化 atomicLong 失败 pool-1-thread-4初始化 atomicLong 失败 pool-1-thread-15初始化 atomicLong 失败 pool-1-thread-13初始化 atomicLong 失败 pool-1-thread-11初始化 atomicLong 失败 pool-1-thread-9初始化 atomicLong 失败 pool-1-thread-7初始化 atomicLong 失败 pool-1-thread-6初始化 atomicLong 失败 pool-1-thread-10初始化 atomicLong 失败 pool-1-thread-15操作成功 pool-1-thread-15当前的值为14289 pool-1-thread-3操作成功 pool-1-thread-3当前的值为14305 pool-1-thread-13操作成功 pool-1-thread-13当前的值为14420 pool-1-thread-2操作成功 pool-1-thread-2当前的值为14681 pool-1-thread-4操作成功 pool-1-thread-4当前的值为14876 pool-1-thread-1操作成功 pool-1-thread-1当前的值为14906 pool-1-thread-5操作成功 pool-1-thread-5当前的值为14953 pool-1-thread-8操作成功 pool-1-thread-8当前的值为14972 pool-1-thread-14操作成功 pool-1-thread-14当前的值为15001 pool-1-thread-7操作成功 pool-1-thread-7当前的值为15020 pool-1-thread-10操作成功 pool-1-thread-10当前的值为15051 pool-1-thread-11操作成功 pool-1-thread-11当前的值为15053 pool-1-thread-9操作成功 pool-1-thread-9当前的值为15060 pool-1-thread-12操作成功 pool-1-thread-12当前的值为15093 pool-1-thread-6操作成功 pool-1-thread-6当前的值为15100
最后的值为15100符合预期。increment、decrement、add以及subtract这四个方法都调用worker方法来完成。
@Override
public AtomicValue increment() throws Exception
{
return worker(1L);
}
@Override
public AtomicValue decrement() throws Exception
{
return worker(-1L);
}
@Override
public AtomicValue add(Long delta) throws Exception
{
return worker(delta);
}
@Override
public AtomicValue subtract(Long delta) throws Exception
{
return worker(-1 * delta);
}
DistributedAtomicLong类的worker方法则是调用DistributedAtomicValue类的trySet方法来完成。
private AtomicValueworker(final Long addAmount) throws Exception { Preconditions.checkNotNull(addAmount, "addAmount cannot be null"); Makevalue makevalue = new Makevalue() { @Override public byte[] makeFrom(byte[] previous) { long previousValue = (previous != null) ? bytesToValue(previous) : 0; long newValue = previousValue + addAmount; return valueToBytes(newValue); } }; AtomicValue result = value.trySet(makevalue); return new AtomicLong(result); }
DistributedAtomicValue类的trySet方法尝试以原子方式将计数器的值设置为给定值,首先尝试使用乐观锁进行操作,如果失败,则采用可选的InterProcessMutex(悲观锁)进行操作。
// 尝试以原子方式将计数器的值设置为给定值
public AtomicValue trySet(final byte[] newValue) throws Exception
{
MutableAtomicValue result = new MutableAtomicValue(null, null, false);
Makevalue makevalue = new Makevalue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
return newValue;
}
};
// 尝试使用乐观锁
tryOptimistic(result, makevalue);
if ( !result.succeeded() && (mutex != null) )
{
// 如果在乐观锁下执行不成功,并且有悲观锁
// 尝试使用悲观锁
tryWithMutex(result, makevalue);
}
return result;
}
DistributedAtomicLong类的trySet方法用于尝试设置计数器的值,也是通过调用DistributedAtomicValue类的trySet方法来完成。
@Override
public AtomicValue trySet(Long newValue) throws Exception
{
return new AtomicLong(value.trySet(valueToBytes(newValue)));
}
DistributedAtomicLong类的forceSet方法用于强制设置计数器的值,通过调用DistributedAtomicValue类的forceSet方法来完成。
@Override
public void forceSet(Integer newValue) throws Exception
{
value.forceSet(valueToBytes(newValue));
}
DistributedAtomicValue类的forceSet方法如下所示,就是直接设置Zookeeper节点的值。
public void forceSet(byte[] newValue) throws Exception
{
try
{
client.setData().forPath(path, newValue);
}
catch ( KeeperException.NoNodeException dummy )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
}
catch ( KeeperException.NodeExistsException dummy2 )
{
client.setData().forPath(path, newValue);
}
}
}
这些方法比较简单,博主就不演示了。
给DistributedAtomicLong设置悲观锁可以如下所示进行操作:
PromotedToLock promotedToLock = PromotedToLock.builder()
// 用于分布式锁的Zookeeper路径
.lockPath("/lock")
// 锁的重试策略
.retryPolicy(new RetryNTimes(100, 5))
// 锁的超时时间
.timeout(10000, TimeUnit.SECONDS)
.build();
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5),
promotedToLock);
Curator框架的共享计数器DistributedAtomicLong就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



