基于RocketMQ可以实现分布式事务的最终一致性解决方案,它使用TransactionMQProducer发送事务消息,事务消息的状态包括:Unknown 未知状态,初始发送的消息就是该状态,该状态下事务消息不会被消费者消费;Commit状态 提交状态,该状态下事务消息会被消费者消费;Rollback状态 回滚状态,事务消息会在broker端被移除,不会被消费者消费。它的处理流程是这样:首先生产者通过TransactionMQProducer发送事务消息到broker,此时事务消息的状态是Unkown状态,不会被消费者消费;在生产者发送事务消息成功后,生产者会通TransactionListner接口执行本地事务,根据执行结果发送事务消息状态到broker,如果是Commit状态,则事务消息此时会被消费者消费处理,如果是Rollback状态,则事务消息会被broker移除,并不会被消费者消费,如果事务消息状态一直处于Unkown状态,则broker会通过生产者的TransactionLisner接口回查事务状态,根据事务状态按照前面的方式进行处理。消费者消费消息和普通的消费方式一样,通过重试机制尽量确保消费成功,如果重试多次还是失败进行人工通知处理。
一 发送事务消息package com.tech.rocketmq.transaction;
import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@Slf4j
@RestController
public class TransactionController {
@Autowired
private TransactionProducer transactionMQProducer;
@GetMapping("tran")
public Object callback(String tag,String otherParam) throws Exception{
Message message = new Message(JmsConfig.TOPIC, tag, tag + "_key", tag.getBytes());
TransactionSendResult sendResult = transactionMQProducer.getProducer().sendMessageInTransaction(message, otherParam);
log.info("发送结果={}",sendResult);
return new HashMap<>();
}
}
方便测试 otherParam参数与事务消息状态对应 1:此时会提交事务消息,消费者可以消费到 2:此时会回滚事务消息,broker端会移除事务消息,消费者无法消费到该条消息 3:broker定时回查事务消息状态
package com.tech.rocketmq.transaction;
import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Slf4j
@Component
public class TransactionProducer {
private String producerGroup = "trac_producer_group";
//事务监听器
private TransactionListener transactionListener = new TransactionListenerImpl();
private TransactionMQProducer producer = null;
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransactionProducer() {
producer=new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
producer.setTransactionListener(transactionListener);
producer.setExecutorService(executorService);
start();
}
public TransactionMQProducer getProducer(){
return this.producer;
}
public void start() {
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public void shutdown(){
this.producer.shutdown();
}
}
@Slf4j
class TransactionListenerImpl implements TransactionListener {
//在发送消息时发送线程会调用该方法,执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("===================executeLocalTransaction=====================");
String body = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
log.info("transactionId={},key={},body={}",transactionId,key,body);
//执行本地事务 begin TODO
//执行本地事务 end TODO
int status = Integer.parseInt(arg.toString());
if(status == 1){
//已确认状态 对待确认的消息进行确认,消费者可以消费该消息了
return LocalTransactionState.COMMIT_MESSAGE;
}
if(status==2){
//回滚状态 回滚消息,该状态下的消息会在broker端删除
return LocalTransactionState.ROLLBACK_MESSAGE;
}
if(status==3){
//未知状态,broker端会调用checkLocalTransaction回查本地事务
return LocalTransactionState.UNKNOW;
}
//超时未返回,和未知状态消息一样,也会在等待一定时间后调用checkLocalTransaction回查本地事务
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("===============checkLocalTransaction=================");
String body = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
log.info("transactionId={},key={},body={}",transactionId,key,body);
//要么commit 要么rollback
//可以根据key去检查本地事务消息是否完成
return LocalTransactionState.COMMIT_MESSAGE;
}
}
二 消费者
package com.tech.rocketmq.transaction;
import com.tech.rocketmq.jms.JmsConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class TransactionConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "tran_consumer_group";
public TransactionConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC,"*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
String key = messageExt.getKeys();
try {
log.info("Receive New Message: {}",new String(messageExt.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
System.out.println("消费异常");
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
在测试时,通过给otherParam传不同的值,来观测Transaction接口回查事务消息状态方法的调用情况和消费者的消费情况。



