栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ系列之事务消息

RocketMQ系列之事务消息

事务这个概念和内容这里就不提了哈,我们这里直接看RocketMQ怎么实现事务消息的,如果真的对事务不了解的,可以看一下这篇:

分布式事务系列之入门_阿小冰的博客-CSDN博客_分布式事务分布式事务系列之入门https://blog.csdn.net/qq_38377525/article/details/123159026


事务消息发送和消费

下面的架子是按照之前的一个框架继续开发的,有必要的话,可以参考一下:

RocketMQ系列之消息发送/消费初体验_阿小冰的博客-CSDN博客RocketMQ系列之消息发送/消费初体验https://blog.csdn.net/qq_38377525/article/details/123253718 1、发送创建订单的事务消息,预下单操作

这里定义一个订单的实体类Order,包含订单ID、订单标题两个属性即可

编写事务的模拟接口,代码如下:

@RestController
public class TransactionalController {
    @Autowired
    private Source source;

    @GetMapping("/transactional")
    public String tramsactional(){
        Order order=new Order("1","上海浦东");
        String transactionId=UUID.randomUUID().toString();
        MessageBuilder builder = MessageBuilder.withPayload(order).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId);
        Message message=builder.build();
        source.output().send(message);
        return "order is ok";
    }
}

2、配置文件

server.port=8070

spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=producer-demo-group

3、创建事务监听器

@RocketMQTransactionListener(txProducerGroup = "OrderTransactionGroup")
public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            //获取前面订单生成的事务id
            String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            //以事务ID为主键,执行本地事务
            Order order=(Order) message.getPayload();
            boolean result = this.saveOrder(order, transactionId);
            return result?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;
        }catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        //获取事务ID
        String transactionId=(String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
        //以事务ID为主键,查询本地事务执行情况
        if(isSuccess(transactionId)){
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

    
    private boolean saveOrder(Order order,String transactionId){
        return true;
    }

    
    private boolean isSuccess(String transactionId){
        return true;
    }
}

使用@RocketMQTransactionListener注解用于接收本地事务的监听,txProducerGroup是事务组名称,和前面定义的OrderTransactionGroup保持一致,其中这个接口还有两个实现方法,也在上面的代码有所体现

executeLocalTransaction:执行本地事务,在消息发送成功会回调执行,一单事务提交成功,下游应用的消费者就能收到该消息checkLocalTransaction:检查本地事务执行状态,如果executeLocalTransaction方法中返回的状态是位置UNKNOWN或者为返回的状态,就会默认在预处理发送的一分钟后由Broker通知Producer检查本地事务,在Producer中回调本地事务监听器中的checkLocalTransaction方法,检查本地事务时,可以根据事务ID查询本地事务的状态,再返回具体事务状态给Broker

4、消费者不变,和上面提供链接中的消费者保持一致即可,然后启动验证即可


事务消息的底层原理

RocketMQ采用的2PC的方案,第一阶段生产者向Broker发送预处理消息,此时消息其实还没有投递出去,这个时候消费者还不能消费,第二阶段生产向Broker发送提交或回滚消息,具体流程如下:

发送预处理消息成功后,开始执行本地事务如果本地事务执行成功,发送提交请求提交事务消息,消息会投递给消费者

 

如果本地事务执行失败,发送回滚请求回滚事务消息,消息不会投递给消费者,看上图

 

 如果本地事务状态未知,因为网络故障或者生产者挂掉了,Broker没有收到二次确认的相关信息,就会由Broker端发送请求向生产者进行回查,询问并确认提交还是回滚,如果状态一直都是未确认,那就需要人工干预程序了,看上图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753652.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号