栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ZooKeeper :Java客户端执行批量任务和Transaction API介绍

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ZooKeeper :Java客户端执行批量任务和Transaction API介绍

ZooKeeper :Java客户端执行批量任务和Transaction API介绍

在上一篇博客中,博主介绍了Watcher API的使用:

  • ZooKeeper :Java客户端Watcher API介绍

这篇博客,博主给大家介绍如何执行批量任务和Transaction API的使用。为什么需要执行批量任务的功能?原因也很简单,因为可以有效的降低处理多任务时的耗时,主要是可以减少客户端与服务端的连接、验证以及数据传输等耗时操作的次数(当然批量任务的数量也不能太多,不然可能会因为网络问题导致丢包和重传,得不偿失)。或者基于某些业务,几个操作必须被原子式地执行。

事务(Transaction)对于执行批量任务就显得尤为重要了,因为可能会出现批量任务不能全部执行成功的情况,比如客户端创建一个临时顺序节点用于获取分布式锁,一般会先创建这个临时顺序节点,再让该节点监听上一个临时顺序节点(如删除事件),当上一个临时顺序节点的客户端处理完业务关闭连接或者宕机导致Session超时后,该客户端的节点会被服务端删除,这样后一个临时顺序节点就会监听到该事件,这样它就可以去获取分布式锁了;如果创建临时顺序节点的操作执行成功,而监听上一个临时顺序节点的操作执行失败,可能会导致分布式锁失效(创建的临时顺序节点一直获取不到锁,因为它没有监听前一个同类节点,从而造成连锁反应)。而事务可以将批量任务封装成原子操作,因此执行该批量任务就像执行原子操作一样,只会出现两种结果:成功与失败。

但ZooKeeper的Transaction API并不是意义上的事务,它只是执行批量任务的一个简单封装,并且还是基于特定种类的操作(下面会进行介绍)。

Transaction类:

@InterfaceAudience.Public
public class Transaction {
   
    // ZooKeeper实例,通过它执行ops(批量任务)
    private ZooKeeper zk;
    // 批量任务
    private List ops = new ArrayList();

    protected Transaction(ZooKeeper zk) {
        this.zk = zk;
    }

    // 只能是更新操作,因此只有create、delete、check、setData这几个操作
    public Transaction create(final String path, byte[] data, List acl, CreateMode createMode) {
        ops.add(Op.create(path, data, acl, createMode.toFlag()));
        return this;
    }

    public Transaction delete(final String path, int version) {
        ops.add(Op.delete(path, version));
        return this;
    }

    public Transaction check(String path, int version) {
        ops.add(Op.check(path, version));
        return this;
    }

    public Transaction setData(final String path, byte[] data, int version) {
        ops.add(Op.setData(path, data, version));
        return this;
    }
    // 还是通过调用multi()方法来执行批量任务,Transaction类中并没有实现事务的功能
    public List commit() throws InterruptedException, KeeperException {
        return zk.multi(ops);
    }

    // commit的异步版本
    public void commit(MultiCallback cb, Object ctx) {
        zk.multi(ops, cb, ctx);
    }
}

而这些不同种类的操作在Op抽象类中进行了定义(删除了大部分代码):


public abstract class Op {

    // 操作种类,Transaction实例只能添加OpKind.TRANSACTION类的操作
    public enum OpKind {
        TRANSACTION,
        READ
    }
    
    private int type;
    private String path;
    private OpKind opKind;
    
    // 默认是OpKind.TRANSACTION类操作
    public static class Create extends Op {

        protected byte[] data;
        protected List acl;
        protected int flags;  
    }
    // 默认是OpKind.TRANSACTION类操作
    public static class CreateTTL extends Create {}
    // 默认是OpKind.TRANSACTION类操作
    public static class Delete extends Op {

        private int version;
    }
    // 默认是OpKind.TRANSACTION类操作
    public static class SetData extends Op {

        private byte[] data;
        private int version;
    }
    // 默认是OpKind.TRANSACTION类操作
    public static class Check extends Op {

        private int version;
    }

    // 默认是OpKind.READ类操作
    public static class GetChildren extends Op {
        GetChildren(String path) {
            super(ZooDefs.OpCode.getChildren, path, OpKind.READ);
        }
    }

    // 默认是OpKind.READ类操作
    public static class GetData extends Op {
        GetData(String path) {
            super(ZooDefs.OpCode.getData, path, OpKind.READ);
        }
    }
}

Create 、Delete 、SetData 以及Check 类都是通过如下图所示的方式,在实例化时调用父类构造器将OpKind.TRANSACTION作为它们实例的opKind。

而CreateTTL类则是在实例化时调用了父类Create的构造器,因此CreateTTL类也是将OpKind.TRANSACTION作为它们实例的opKind。

因此,可以知道Transaction类并没有实现事务的相应功能,只是提供了一个仅用于OpKind.TRANSACTION类操作的批量任务处理接口。而实际执行这些任务的是ZooKeeper类中的multi方法。

    
    public List multi(Iterable ops) throws InterruptedException, KeeperException {
        for (Op op : ops) {
            op.validate();
        }
        return multiInternal(generateMultiTransaction(ops));
    }


    // multi的异步版本
    public void multi(Iterable ops, MultiCallback cb, Object ctx) {
        List results = validatePath(ops);
        if (results.size() > 0) {
            cb.processResult(KeeperException.Code.BADARGUMENTS.intValue(), null, ctx, results);
            return;
        }
        multiInternal(generateMultiTransaction(ops), cb, ctx);
    }

注意:multi方法只能执行同一种OpKind类型的操作。

测试类:

package com.kaven.zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;



public class Application implements Watcher {

    private static CountDownLatch latch;
    private static final String SERVER_PROXY = "192.168.1.184:9000";
    private static final int TIMEOUT = 40000;
    private static long time;

    private String watcherName;

    protected Application(String watcherName) {
        this.watcherName = watcherName;
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
        Watcher connectWatcher = new Application("connectWatcher");
        latch = new CountDownLatch(1);
        time = System.currentTimeMillis();
        ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
        latch.await();
        System.out.println(zk.getState());
        System.out.println("Connection complete!");

        String transactionMessage = "transaction success";
        Transaction transaction = zk.transaction()
                .create("/transaction", "transaction data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
                .setData("/transaction", "new data".getBytes(), -1)
                .check("/transaction", 100)
                .create("/transaction2", "transaction2 data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        transaction.commit(
               (rc, path, ctx, opResults) -> {
                   System.out.println(KeeperException.Code.get(rc).name());
                   opResults.forEach(Application::printOpResult);
                   if(rc == KeeperException.Code.OK.intValue()) {
                       System.out.println(ctx);
                   }
               },
               transactionMessage
        );

        String multiMessage = "multi success";
        List opList = new ArrayList<>();
        opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        opList.add(Op.check("/multi", 1));
        opList.add(Op.setData("/multi", "new data".getBytes(), -1));
        opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
        opList.add(Op.check("/multi/son", 0));
        opList.add(Op.delete("/multi/son", -1));
        zk.multi(opList,
                (rc, path, ctx, opResults) -> {
                    System.out.println(KeeperException.Code.get(rc).name());
                    opResults.forEach(Application::printOpResult);
                    if(rc == KeeperException.Code.OK.intValue()) {
                        System.out.println(ctx);
                    }
                },
                multiMessage
        );

        Thread.sleep(1000000);
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("-----------------WatchedEvent------------------");
        System.out.println(this.watcherName);
        System.out.println(watchedEvent.getType());
        System.out.println(watchedEvent.getState().name());
        System.out.println(watchedEvent.getPath());
        System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
        time = System.currentTimeMillis();
        System.out.println("-----------------WatchedEvent------------------");
        if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
            latch.countDown();
        }
    }

    public static void printOpResult(OpResult opResult) {
        System.out.println("-----------------printOpResult------------------");
        if(opResult instanceof OpResult.CreateResult) {
            System.out.println("CreateResult");
            System.out.println(((OpResult.CreateResult) opResult).getPath());
        }
        else if(opResult instanceof  OpResult.CheckResult) {
            System.out.println("CheckResult");
        }
        else if(opResult instanceof  OpResult.GetDataResult) {
            System.out.println("GetDataResult");
            System.out.println(new String(((OpResult.GetDataResult) opResult).getData()));
        }
        else if(opResult instanceof  OpResult.SetDataResult) {
            System.out.println("SetDataResult");
            System.out.println(((OpResult.SetDataResult) opResult).getStat().getDataLength());
        }
        else if(opResult instanceof  OpResult.GetChildrenResult) {
            System.out.println("GetChildrenResult");
            ((OpResult.GetChildrenResult) opResult).getChildren().forEach(System.out::println);
        }
        else if(opResult instanceof  OpResult.DeleteResult) {
            System.out.println("DeleteResult");
        }
        else if(opResult instanceof  OpResult.ErrorResult) {
            System.out.println("ErrorResult");
            int errorCode = ((OpResult.ErrorResult) opResult).getErr();
            System.out.println(KeeperException.Code.get(errorCode).name());
        }
        System.out.println("-----------------printOpResult------------------");
    }
}

输出结果:

-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13703
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
BADVERSION
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------
BADVERSION
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------

使用commit方法和multi方法来执行批量任务是类似的,毕竟commit方法就是调用multi方法来执行批量任务的:

    public List commit() throws InterruptedException, KeeperException {
        return zk.multi(ops);
    }

    // 异步版
    public void commit(MultiCallback cb, Object ctx) {
        zk.multi(ops, cb, ctx);
    }

调用multi方法执行批量任务能保证批量任务的原子性吗?Java客户端还在运行(会话还没结束):

而在其他客户端上并没有查询到/transaction和/multi这两个节点(但创建/transaction和/multi这两个节点的操作的返回码都是OK,很显然被回滚了):

很明显两次批量任务执行失败都是因为check操作执行不成功导致的:

check("/transaction", 100)
opList.add(Op.check("/multi", 1));

如果删除这两个操作,两次批量任务都会执行成功:

-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13755
-----------------WatchedEvent------------------
ConNECTED
Connection complete!
OK
-----------------printOpResult------------------
CreateResult
/transaction
-----------------printOpResult------------------
-----------------printOpResult------------------
SetDataResult
8
-----------------printOpResult------------------
-----------------printOpResult------------------
CreateResult
/transaction2
-----------------printOpResult------------------
transaction success
OK
-----------------printOpResult------------------
CreateResult
/multi
-----------------printOpResult------------------
-----------------printOpResult------------------
SetDataResult
8
-----------------printOpResult------------------
-----------------printOpResult------------------
CreateResult
/multi/son
-----------------printOpResult------------------
-----------------printOpResult------------------
CheckResult
-----------------printOpResult------------------
-----------------printOpResult------------------
DeleteResult
-----------------printOpResult------------------
multi success

如下所示的输出,是批量任务中的某个操作执行失败导致的:

// 前面的操作执行成功了(后面的操作执行失败后回滚,才能保证事务)
-----------------printOpResult------------------
ErrorResult
OK
-----------------printOpResult------------------
// 执行该操作失败了,版本不匹配(BADVERSION),就是在检查版本时,输入的版本与节点的版本不匹配
-----------------printOpResult------------------
ErrorResult
BADVERSION
-----------------printOpResult------------------
// 后面的操作不需要再执行,执行错误的原因都是:RUNTIMEINCONSISTENCY
-----------------printOpResult------------------
ErrorResult
RUNTIMEINConSISTENCY
-----------------printOpResult------------------

为什么说multi方法只能执行同一种OpKind类型的操作。如果multi方法执行如下操作(同时存在OpKind.TRANSACTION类和OpKind.READ类操作):

        List opList = new ArrayList<>();
        opList.add(Op.create("/multi", "multi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
        opList.add(Op.setData("/multi", "new data".getBytes(), -1));
        opList.add(Op.getData("/multi"));
        opList.add(Op.create("/multi/son", "multi son".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL));
        opList.add(Op.check("/multi/son", 0));
        opList.add(Op.delete("/multi/son", -1));

会报错(multi请求中不允许混合读写操作):

    private void setOrCheckOpKind(Op.OpKind ok) throws IllegalArgumentException {
        if (opKind == null) {
            opKind = ok;
        } else if (ok != opKind) {
            throw new IllegalArgumentException("Mixing read and write operations (transactions)"
                                               + " is not allowed in a multi request.");
        }
    }

在setOrCheckOpKind方法中会检查每个操作的OpKind类型,opKind会保存操作的OpKind类型,如果出现OpKind类型不同的操作,就会抛出异常(进入else if分支)。

Java客户端执行批量任务和Transaction API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/591022.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号