栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ 发送事务消息 事务消息的执行流程

RocketMQ 发送事务消息 事务消息的执行流程

  1. 本地方法发送消息至borker
  2. borker 执行TransactionListener的方法。执行本地事务executeLocalTransaction
  3. 如果返回unkow状态,borker会回调checkLocalTransaction检查事务的结果,返回提交还是回滚
  4. borker设置消息对外可见,等待消费或主动推送

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

事务的流程如下图简述

 实例代码

    
    @Bean
    @Lazy
    public TransactionMQProducer defaultTransactionMQProducer() throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer(TsitMQGroup.PROVIDER_INDUSTRY_GROUP);
        producer.setNamesrvAddr(ipcProps.getRocketMQServer());
        producer.start();
        log.info("init ipc defaultTransactionMQProducer");
        return producer;
    }

发送事务消息

        // 自定义线程池
        transactionMQProducer.setExecutorService(ThreadFactory.getInstance());
        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                List list = fp.stream().peek(it -> {
                    it.setId(idUtils.nextIdStr());
                    it.setOrganId(user.getOrganId());
                }).collect(Collectors.toList());
               boolean flag = cmFpMapper.saveBatch(list) != 0;
                
                // 需要borker回查就返回unkow
               return flag ? LocalTransactionState.COMMIT_MESSAGE :                         
                LocalTransactionState.ROLLBACK_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 此方法 borker并不会回查,因为在本地事务就返回了回滚还是提交
                // 当本地事务执行时返回unkow,borker回调用该该方法检查事务结果
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        transactionMQProducer.sendMessageInTransaction(
                new Message(TsitMQTopic.INDUSTRY_TOPIC, TsitMQTag.FP_CALLBACK, JSONUtil.toJsonStr(dto).getBytes(StandardCharsets.UTF_8)),
                null);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663857.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号