在上一篇博客中,博主介绍了Watcher API的使用:
- ZooKeeper :Java客户端Watcher API介绍
这篇博客,博主给大家介绍如何执行批量任务和Transaction API的使用。为什么需要执行批量任务的功能?原因也很简单,因为可以有效的降低处理多任务时的耗时,主要是可以减少客户端与服务端的连接、验证以及数据传输等耗时操作的次数(当然批量任务的数量也不能太多,不然可能会因为网络问题导致丢包和重传,得不偿失)。或者基于某些业务,几个操作必须被原子式地执行。
事务(Transaction)对于执行批量任务就显得尤为重要了,因为可能会出现批量任务不能全部执行成功的情况,比如客户端创建一个临时顺序节点用于获取分布式锁,一般会先创建这个临时顺序节点,再让该节点监听上一个临时顺序节点(如删除事件),当上一个临时顺序节点的客户端处理完业务关闭连接或者宕机导致Session超时后,该客户端的节点会被服务端删除,这样后一个临时顺序节点就会监听到该事件,这样它就可以去获取分布式锁了;如果创建临时顺序节点的操作执行成功,而监听上一个临时顺序节点的操作执行失败,可能会导致分布式锁失效(创建的临时顺序节点一直获取不到锁,因为它没有监听前一个同类节点,从而造成连锁反应)。而事务可以将批量任务封装成原子操作,因此执行该批量任务就像执行原子操作一样,只会出现两种结果:成功与失败。
但ZooKeeper的Transaction API并不是意义上的事务,它只是执行批量任务的一个简单封装,并且还是基于特定种类的操作(下面会进行介绍)。
Transaction类:
@InterfaceAudience.Public
public class Transaction {
// ZooKeeper实例,通过它执行ops(批量任务)
private ZooKeeper zk;
// 批量任务
private List ops = new ArrayList();
protected Transaction(ZooKeeper zk) {
this.zk = zk;
}
// 只能是更新操作,因此只有create、delete、check、setData这几个操作
public Transaction create(final String path, byte[] data, List acl, CreateMode createMode) {
ops.add(Op.create(path, data, acl, createMode.toFlag()));
return this;
}
public Transaction delete(final String path, int version) {
ops.add(Op.delete(path, version));
return this;
}
public Transaction check(String path, int version) {
ops.add(Op.check(path, version));
return this;
}
public Transaction setData(final String path, byte[] data, int version) {
ops.add(Op.setData(path, data, version));
return this;
}
// 还是通过调用multi()方法来执行批量任务,Transaction类中并没有实现事务的功能
public List commit() throws InterruptedException, KeeperException {
return zk.multi(ops);
}
// commit的异步版本
public void commit(MultiCallback cb, Object ctx) {
zk.multi(ops, cb, ctx);
}
}
而这些不同种类的操作在Op抽象类中进行了定义(删除了大部分代码):
public abstract class Op {
// 操作种类,Transaction实例只能添加OpKind.TRANSACTION类的操作
public enum OpKind {
TRANSACTION,
READ
}
private int type;
private String path;
private OpKind opKind;
// 默认是OpKind.TRANSACTION类操作
public static class Create extends Op {
protected byte[] data;
protected List acl;
protected int flags;
}
// 默认是OpKind.TRANSACTION类操作
public static class CreateTTL extends Create {}
// 默认是OpKind.TRANSACTION类操作
public static class Delete extends Op {
private int version;
}
// 默认是OpKind.TRANSACTION类操作
public static class SetData extends Op {
private byte[] data;
private int version;
}
// 默认是OpKind.TRANSACTION类操作
public static class Check extends Op {
private int version;
}
// 默认是OpKind.READ类操作
public static class GetChildren extends Op {
GetChildren(String path) {
super(ZooDefs.OpCode.getChildren, path, OpKind.READ);
}
}
// 默认是OpKind.READ类操作
public static class GetData extends Op {
GetData(String path) {
super(ZooDefs.OpCode.getData, path, OpKind.READ);
}
}
}
Create 、Delete 、SetData 以及Check 类都是通过如下图所示的方式,在实例化时调用父类构造器将OpKind.TRANSACTION作为它们实例的opKind。
而CreateTTL类则是在实例化时调用了父类Create的构造器,因此CreateTTL类也是将OpKind.TRANSACTION作为它们实例的opKind。
因此,可以知道Transaction类并没有实现事务的相应功能,只是提供了一个仅用于OpKind.TRANSACTION类操作的批量任务处理接口。而实际执行这些任务的是ZooKeeper类中的multi方法。
public List multi(Iterable ops) throws InterruptedException, KeeperException {
for (Op op : ops) {
op.validate();
}
return multiInternal(generateMultiTransaction(ops));
}
// multi的异步版本
public void multi(Iterable ops, MultiCallback cb, Object ctx) {
List results = validatePath(ops);
if (results.size() > 0) {
cb.processResult(KeeperException.Code.BADARGUMENTS.intValue(), null, ctx, results);
return;
}
multiInternal(generateMultiTransaction(ops), cb, ctx);
}
注意:multi方法只能执行同一种OpKind类型的操作。
测试类:
package com.kaven.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class Application implements Watcher {
private static CountDownLatch latch;
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
private static long time;
private String watcherName;
protected Application(String watcherName) {
this.watcherName = watcherName;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
String transactionMessage = "transaction success";
Transaction transaction = zk.transaction()
.create("/transaction", "transaction data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
.setData("/transaction", "new data".getBytes(), -1)
.check("/transaction", 100)
.create("/transaction2", "transaction2 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
transaction.commit(
(rc, path, ctx, opResults) -> {
System.out.println(KeeperException.Code.get(rc).name());
opResults.forEach(Application::printOpResult);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(ctx);
}
},
transactionMessage
);
String multiMessage = "multi success";
List opList = new ArrayList<>();
opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
opList.add(Op.check("/multi", 1));
opList.add(Op.setData("/multi", "new data".getBytes(), -1));
opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
opList.add(Op.check("/multi/son", 0));
opList.add(Op.delete("/multi/son", -1));
zk.multi(opList,
(rc, path, ctx, opResults) -> {
System.out.println(KeeperException.Code.get(rc).name());
opResults.forEach(Application::printOpResult);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(ctx);
}
},
multiMessage
);
Thread.sleep(1000000);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------WatchedEvent------------------");
System.out.println(this.watcherName);
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getState().name());
System.out.println(watchedEvent.getPath());
System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
time = System.currentTimeMillis();
System.out.println("-----------------WatchedEvent------------------");
if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
latch.countDown();
}
}
public static void printOpResult(OpResult opResult) {
System.out.println("-----------------printOpResult------------------");
if(opResult instanceof OpResult.CreateResult) {
System.out.println("CreateResult");
System.out.println(((OpResult.CreateResult) opResult).getPath());
}
else if(opResult instanceof OpResult.CheckResult) {
System.out.println("CheckResult");
}
else if(opResult instanceof OpResult.GetDataResult) {
System.out.println("GetDataResult");
System.out.println(new String(((OpResult.GetDataResult) opResult).getData()));
}
else if(opResult instanceof OpResult.SetDataResult) {
System.out.println("SetDataResult");
System.out.println(((OpResult.SetDataResult) opResult).getStat().getDataLength());
}
else if(opResult instanceof OpResult.GetChildrenResult) {
System.out.println("GetChildrenResult");
((OpResult.GetChildrenResult) opResult).getChildren().forEach(System.out::println);
}
else if(opResult instanceof OpResult.DeleteResult) {
System.out.println("DeleteResult");
}
else if(opResult instanceof OpResult.ErrorResult) {
System.out.println("ErrorResult");
int errorCode = ((OpResult.ErrorResult) opResult).getErr();
System.out.println(KeeperException.Code.get(errorCode).name());
}
System.out.println("-----------------printOpResult------------------");
}
}
输出结果:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13703 -----------------WatchedEvent------------------ ConNECTED Connection complete! BADVERSION -----------------printOpResult------------------ ErrorResult OK -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult OK -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult BADVERSION -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------ BADVERSION -----------------printOpResult------------------ ErrorResult OK -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult BADVERSION -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------ -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------
使用commit方法和multi方法来执行批量任务是类似的,毕竟commit方法就是调用multi方法来执行批量任务的:
public Listcommit() throws InterruptedException, KeeperException { return zk.multi(ops); } // 异步版 public void commit(MultiCallback cb, Object ctx) { zk.multi(ops, cb, ctx); }
调用multi方法执行批量任务能保证批量任务的原子性吗?Java客户端还在运行(会话还没结束):
而在其他客户端上并没有查询到/transaction和/multi这两个节点(但创建/transaction和/multi这两个节点的操作的返回码都是OK,很显然被回滚了):
很明显两次批量任务执行失败都是因为check操作执行不成功导致的:
check("/transaction", 100)
opList.add(Op.check("/multi", 1));
如果删除这两个操作,两次批量任务都会执行成功:
-----------------WatchedEvent------------------ connectWatcher None SyncConnected null time use(ms):13755 -----------------WatchedEvent------------------ ConNECTED Connection complete! OK -----------------printOpResult------------------ CreateResult /transaction -----------------printOpResult------------------ -----------------printOpResult------------------ SetDataResult 8 -----------------printOpResult------------------ -----------------printOpResult------------------ CreateResult /transaction2 -----------------printOpResult------------------ transaction success OK -----------------printOpResult------------------ CreateResult /multi -----------------printOpResult------------------ -----------------printOpResult------------------ SetDataResult 8 -----------------printOpResult------------------ -----------------printOpResult------------------ CreateResult /multi/son -----------------printOpResult------------------ -----------------printOpResult------------------ CheckResult -----------------printOpResult------------------ -----------------printOpResult------------------ DeleteResult -----------------printOpResult------------------ multi success
如下所示的输出,是批量任务中的某个操作执行失败导致的:
// 前面的操作执行成功了(后面的操作执行失败后回滚,才能保证事务) -----------------printOpResult------------------ ErrorResult OK -----------------printOpResult------------------ // 执行该操作失败了,版本不匹配(BADVERSION),就是在检查版本时,输入的版本与节点的版本不匹配 -----------------printOpResult------------------ ErrorResult BADVERSION -----------------printOpResult------------------ // 后面的操作不需要再执行,执行错误的原因都是:RUNTIMEINCONSISTENCY -----------------printOpResult------------------ ErrorResult RUNTIMEINConSISTENCY -----------------printOpResult------------------
为什么说multi方法只能执行同一种OpKind类型的操作。如果multi方法执行如下操作(同时存在OpKind.TRANSACTION类和OpKind.READ类操作):
ListopList = new ArrayList<>(); opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); opList.add(Op.setData("/multi", "new data".getBytes(), -1)); opList.add(Op.getData("/multi")); opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)); opList.add(Op.check("/multi/son", 0)); opList.add(Op.delete("/multi/son", -1));
会报错(multi请求中不允许混合读写操作):
private void setOrCheckOpKind(Op.OpKind ok) throws IllegalArgumentException {
if (opKind == null) {
opKind = ok;
} else if (ok != opKind) {
throw new IllegalArgumentException("Mixing read and write operations (transactions)"
+ " is not allowed in a multi request.");
}
}
在setOrCheckOpKind方法中会检查每个操作的OpKind类型,opKind会保存操作的OpKind类型,如果出现OpKind类型不同的操作,就会抛出异常(进入else if分支)。
Java客户端执行批量任务和Transaction API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。



