栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ中事务消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ中事务消息

基本流程图

定义事务生产者类

TransactionSyncProducer类如下

package com.cst.transaction;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;


public class TransactionSyncProducer {
    public static  void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        //创建一个producer
        TransactionMQProducer producer = new TransactionMQProducer("dome_producer_transaction_group");
        //指定nameServer地址
        producer.setNamesrvAddr("192.168.12.140:9876");

        //指定消息监听对象,用于执行本地事务和消息回查
        TransactionListenerImpl transactionListener = new TransactionListenerImpl();
        producer.setTransactionListener(transactionListener);

        //线程池
        ExecutorService executorService = new ThreadPoolExecutor(
                2,
                5,
                100,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(
                        2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable runnable) {
                        Thread thread = new Thread(runnable);
                        thread.setName("client-transaction-msg-check-thread");
                        return thread;
                    }
                }
        );
        producer.setExecutorService(executorService);

        //开启生产者
        producer.start();
        //创建消息
        Message message = new Message("Topic_Transaction_Demo",
                "sometag",
                ("hello!-Transaction").getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        //发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
        System.out.println(result);

        //关闭
        producer.shutdown();
    }
}


定义事务实现类

TransactionListenerImpl如下

package com.cst.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;


public class TransactionListenerImpl implements TransactionListener {

    private ConcurrentHashMap localTrans = new ConcurrentHashMap();

    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        //事务ID
        String transactionId = msg.getTransactionId();

        //0:执行中,状态未知  1:本地事务执行成功 2:本地事务执行失败
        localTrans.put(transactionId, 0);

        //业务执行,处理本地事务,service
        System.out.println("hello!---Dome---Transaction");

        try {
            System.out.println("正在执行本地事务--");
            Thread.sleep(70000);
            System.out.println("正在执行本地事务--成功!");
            localTrans.put(transactionId, 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            localTrans.put(transactionId, 2);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.COMMIT_MESSAGE;
    }

    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        //获取对应事务ID状态
        String transactionId = msg.getTransactionId();
        //获取对应事务执行ID的执行状态
        Integer status = localTrans.get(transactionId);
        System.out.println("消息回查----transactionId" + transactionId + "-----状态:" + status);
        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;
    }
}

执行结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/276252.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号