栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMq源码刨析之分布式事务

RocketMq源码刨析之分布式事务

RocketMq源码刨析

想必大家都比较熟悉RocketMQ,阿里开源消息队列项目。对于队列来说可以直接强势得理解成,处理并非、分布式事务得敌虫。

[源码地址]: https://github.com/apache/rocketmq RocketMq4.3版本 支持分布式事物 案例入口【org.apache.rocketmq.example.transaction.TransactionProducer】
    //实现监听
    TransactionListener transactionListener = new TransactionListenerImpl();
    //生产者本地初始化
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("client-transaction-msg-check-thread");
            return thread;
        }
    });
    //设置线程池
    producer.setExecutorService(executorService);
    //设置生产者本地事务得回调组件
    producer.setTransactionListener(transactionListener);
    //开启消息处理
    producer.start();
案例入口【org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendMessageInTransaction】
 public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                       final LocalTransactionExecuter localTransactionExecuter, final Object arg)
     throws MQClientException {
     //获取之前注册得TransactionListener本地事务回调组件
     TransactionListener transactionListener = getCheckListener();
     if (null == localTransactionExecuter && null == transactionListener) {
         throw new MQClientException("tranExecutor is null", null);
     }
     //验证消息
     Validators.checkMessage(msg, this.defaultMQProducer);

     SendResult sendResult = null;
     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
     MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
     try {
         //发送消息
         sendResult = this.send(msg);
     } catch (Exception e) {
         throw new MQClientException("send message Exception", e);
     }

     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
     Throwable localException = null;

     //获取发送消息回调结果
     switch (sendResult.getSendStatus()) {
         case SEND_OK: {//发送成功
             try {
                 if (sendResult.getTransactionId() != null) {
                     msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                 }
                 String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                 if (null != transactionId && !"".equals(transactionId)) {
                     msg.setTransactionId(transactionId);
                 }

                 //开启了本地事务回调组件才会进行回调处理
                 if (null != localTransactionExecuter) {
                     localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                 } else if (transactionListener != null) {
                     log.debug("Used new transaction API");
                     //执行本地事务
                     localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                 }
                 if (null == localTransactionState) {
                     localTransactionState = LocalTransactionState.UNKNOW;
                 }

                 if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                     log.info("executeLocalTransactionBranch return {}", localTransactionState);
                     log.info(msg.toString());
                 }
             } catch (Throwable e) {
                 log.info("executeLocalTransactionBranch exception", e);
                 log.info(msg.toString());
                 localException = e;
             }
         }
         break;
         case FLUSH_DISK_TIMEOUT:
         case FLUSH_SLAVE_TIMEOUT:
         case SLAVE_NOT_AVAILABLE:
             localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
             break;
         default:
             break;
     }

     try {
         //根据本地事务执行的结果去发送commit消息或者rollback消息
         this.endTransaction(sendResult, localTransactionState, localException);
     } catch (Exception e) {
         log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
     }

     TransactionSendResult transactionSendResult = new TransactionSendResult();
     transactionSendResult.setSendStatus(sendResult.getSendStatus());
     transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
     transactionSendResult.setMsgId(sendResult.getMsgId());
     transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
     transactionSendResult.setTransactionId(sendResult.getTransactionId());
     transactionSendResult.setLocalTransactionState(localTransactionState);
     return transactionSendResult;
 }
总要节点

获取之前注册得TransactionListener本地事务回调组件:TransactionListener transactionListener = getCheckListener();验证消息: Validators.checkMessage(msg, this.defaultMQProducer);发送消息: sendResult = this.send(msg);获取发送消息回调结果:switch (sendResult.getSendStatus())如果开启事务transactionListener,执行本地事务:localTransactionState = transactionListener.executeLocalTransaction(msg, arg);根据本地事务执行的结果去发送commit消息或者rollback消息:this.endTransaction(sendResult, localTransactionState, localException); 本地事务逻辑 案例入口【org.apache.rocketmq.example.transaction.executeLocalTransaction】

  @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        //这里会执行本地业务逻辑,此处省略...
        //返回本地事物的执行结果(UNKNOW、commit、rollback)
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }
Netty线程检查事务状态 案例入口【org.apache.rocketmq.example.transaction.checkLocalTransaction】
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        //实现本地事务处理得结果逻辑
        //TODO 业务数据
        //比如本地业务想表A插入数据,那么此处可以去表A查询数据是否存在,就可以指导本地事务是否成功
        //根据本地事务响应得到结果,返回不同得状态。
        //本地事物执行成功返回COMMIT_MESSAGE,反之失败返回ROLLBACK_MESSAGE
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

业务场景源码正在创作…

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711543.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号