如果扣款失败,取款依旧可能会成功,需要利用事务处理。
- 事务管理器TM 向事物协调器TC发送指令,开启事务。
- 工行系统发送一个给B赠款1万元的事物消息给TC。
- TC向Broker发送半事务消息prepareHalf,将消息M预提交到broker。此时建行系统看不到broker中的消息。
- Broker将预提交结果上报给TC。
- 预提交失败,则TC会向TM发送消息M预提交失败响应,全局事务结束;预提交成功,TC会调用工行系统回调操作,去完成工行用户A的预扣款1万元的操作。
- 工行系统会向TC发送预扣款结果通知
- TC向TM发送预扣款结果通知
COMMIT_MESSAGE 本地事务执行成功
ROLLBACK_MESSAGE 本地事务执行失败
UNKNOW 不确定,表示需要进行回查 - TM根据上报结果向TC发送不同确认指令
COMMIT_MESSAGE =>Global Commit
ROLLBACK_MESSAGE =>Global Rollback
UNKNOW =>触发工行回查操作。 - TC接收到指令后,会向Broker与工行系统发送确认指令
1)分布式事务:一次操作由若干分支操作组成,这些分支属于不用应用,分布在不同服务器。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。
2)事务消息:Rocket MQ提供了类似x/openXA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务的解决方案,一种分布式事务处理模式。
3)半事务消息:暂不能消费消息。发送方成功发送到Broker,但Broker未收到最终确认指令,此时该消息被标记为暂不能投递状态,不能被消费者看到。
4)本地事务状态:Producer回调操作执行的结果为本地事务状态,其会发送给TC,而TC会再发送给TM。TM会根据TC发送来的本地事务状态来决定全局事务确认指令。
5)消息回查:重新查询本地事务的执行状态。
消息回查设置:
6)XA模式三剑客
TC、TM、RM(资源管理器)
事务代码: https://gitee.com/anilnak/rocket-mq/tree/dev/7.0.0
- 批量发送消息
发送限制:生产者进行消息发送时可以一次发送多条消息,这样可以大大提升Producer的发送效率。不过需要注意一下几点:
*批量发送的消息必须具有相同的Topic
*批量发送的消息必须具有相同的刷盘策略
*批量发送的消息不能是延时消息与事务消息
*批量发送的大小:总大小不能超过4M
如果想发送超过4M大小的消息,需要将消息进行拆分成若干个不大于4M的消息集合分多次批量发送;或者在Producer端与Broker端修改属性。
*Producer端需要在发送之前设置Producer端maxMessageSize属性
*Broker端需要修改加载的配置文件中的maxMessageSize属性 - 批量消费消息
Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列表,但默认情况下每次只能消费一条消息。需要修改Consumer的consumeMessageBatchMaxSize属性来指定。不过此值不可以超过32。默认情况下消费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值,可以通过Consumer的pullBatchSize属性来指定。值越大,拉取时间越长。
Tag 过滤:通过consumer的subscribe()方法指定要订阅消息的tag。
Sql过滤:默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性:enablePropertyFilter = true



