rocketmq发送事务消息的过程为
1.发送half消息
2.消息队列返回发送结果
3.执行本地事务
4.提交/回滚/未知
5.如果第四步为未知,则消息队列会反查本地事务,本地事务查询后再通知消息队列最终提交或回滚
生产者
public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
TransactionMQProducer producer = new TransactionMQProducer("group_09");
producer.setNamesrvAddr("localhost:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
System.out.println("执行本地事务");
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("执行补偿事务");
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
Message msg =
new Message("topic14", "tag1",
("Hello RocketMQ transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("发送成功" + sendResult);
}
}
消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_03");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic14", "*");
consumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(messageExt);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}



