RocketMQ半事物原理 可以百度
落地代码
1、首先添加以下工具类
@Configuration
public class RocketMQExecutorServiceConfig {
private static final AtomicInteger counter = new AtomicInteger(1);
private static class RocketMQThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("RocketMQThreadFactory."+counter.getAndIncrement());
return thread;
}
}
@Bean("rocketMQExecutorService")
public ExecutorService executorService()
{
ExecutorService executorService = new ThreadPoolExecutor(
8,
24,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue
new RocketMQThreadFactory()
);
return executorService;
}
}
LocalExecutor
public interface LocalExecutor
void execute() throws Exception;
T getResult();
ResultDTO
}
EetMQProducer
@Slf4j
@Configuration
public class EetMQProducer {
private String namesrv;
private String appName;
@Autowired
public ExecutorService executorService;
@Autowired
private EetEeartagBagAppService eetEeartagBagAppService;
private EetTransactionRecordMapper eetTransactionRecordMapper;
private basePService
public EetMQProducer(EetTransactionRecordMapper eetTransactionRecordMapper)
{
this.eetTransactionRecordMapper = eetTransactionRecordMapper;
this.eetTransactionRecordPoService = new basePService<>(eetTransactionRecordMapper);
}
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
LocalExecutor localExecutor = (LocalExecutor) arg;
localExecutor.execute();
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
EetTransactionRecordPo transactionRecordPo = eetTransactionRecordMapper.getByTxid(msg.getTransactionId());
if( transactionRecordPo==null ){
long timePassed = System.currentTimeMillis() - msg.getBornTimestamp();
if( timePassed < 1000 * 60 ){
return LocalTransactionState.UNKNOW;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
if(StringUtils.equals(EetTransactionRecordStatusEnum.FINISHED.getStatus(), transactionRecordPo.getStatus()) ){
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
@Bean
public TransactionMQProducer init() throws MQClientException {
String group = "EetMQProducer-" + appName;
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(group);
producer.setNamesrvAddr(namesrv);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
return producer;
}
}
2、对service层的改造
1)将原来的service方法的逻辑包在try finally中,在最后添加事务记录信息。需要注意的是之前有些主键id是在service方法内生成的,现在由于发消息是在执行service方法之前,所以主键id需要在service调用之前生成再通过参数传进来。
@Override
public EetEeartagBagGrantResDto save(Long bagId, EetEeartagBagGrantReqDto reqDto, Message msg)
{
LocalDateTime startTime = LocalDateTime.now();
Exception exception = null;
try {
...
return result;
} catch (Exception e) {
exception = e;
throw e;
} finally {
if( exception==null ){
eetTransactionRecordAppService.save(msg.getTransactionId(), startTime, EetTransactionRecordStatusEnum.FINISHED.getStatus(), reqDto);
}
}
2)将原来对service层的调用封装到LocalExecutor中
@Component
public class EetEeartagBagCreatedLocalExecutorFactory {
@Autowired
private EetEeartagBagAppService eetEeartagBagAppService;
public LocalExecutor create(Long bagId, EetEeartagBagGrantReqDto reqDto, Message msg)
{
return new EetEeartagBagCreatedLocalExecutor(bagId, reqDto, msg);
}
public class EetEeartagBagCreatedLocalExecutor implements LocalExecutor
{
private Long bagId;
private EetEeartagBagGrantReqDto reqDto;
private Message msg;
private EetEeartagBagGrantResDto result;
private Exception exception;
public EetEeartagBagCreatedLocalExecutor(Long bagId, EetEeartagBagGrantReqDto reqDto, Message msg)
{
this.bagId = bagId;
this.reqDto = reqDto;
this.msg = msg;
}
@Override
public void execute() {
try {
result = eetEeartagBagAppService.save(bagId, reqDto, msg);
} catch (Exception e) {
exception = e;
throw e;
}
}
@Override
public EetEeartagBagGrantResDto getResult() {
return result;
}
@Override
public ResultDTO
if( exception!=null ){
throw exception;
}
return ResultDTO.ok(result);
}
}
}
3、对controller层的改造
注入TransactionMQProducer
| @Autowired private TransactionMQProducer transactionMQProducer; |
2)之前在controller是直接调用service的方法,现在需要修改为创建LocalExecutor,然后发送半事务消息的时候将LocalExecutor做为第二个参数,这样LocalExecutor就可以在半事务消息的执行本地事务阶段被执行,然后LocalExecutor还可以获取service执行方法的结果(或异常信息)
| @PostMapping(consumes = {"application/json"}, produces = {"application/json"}) public ResultDTO @ApiParam(value = "要保存的对象", required = true) @Valid @RequestBody EetEeartagBagGrantReqDto reqDto) throws Exception { long bagId = snowflakeIdWorker.nextId(); JSonObject json = new JSonObject(); json.put("bagId", bagId); Message msg = new Message(EetMQTopics.eet_eeartag_bag_created, "", json.toJSonString().getBytes("UTF-8")); LocalExecutor localExecutor = eetEeartagBagCreatedLocalExecutorFactory.create(bagId, reqDto, msg); transactionMQProducer.sendMessageInTransaction(msg, localExecutor); return localExecutor.resultDtoOrThrow(); } |



