本次演示api
broker MessageFilterorg.apache.rocketmq rocketmq-spring-boot-starter 2.2.1
public interface MessageFilter {
boolean isMatchedByConsumeQueue(final Long tagsCode,
final ConsumeQueueExt.CqExtUnit cqExtUnit);
boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
final Map properties);
}
DefaultMessageFilter
public class DefaultMessageFilter implements MessageFilter {
private SubscriptionData subscriptionData;
public DefaultMessageFilter(final SubscriptionData subscriptionData) {
this.subscriptionData = subscriptionData;
}
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == tagsCode || null == subscriptionData) {
return true;
}
if (subscriptionData.isClassFilterMode()) {
return true;
}
return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
|| subscriptionData.getCodeSet().contains(tagsCode.intValue());
}
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) {
return true;
}
}
consumer
PullAPIWrapper - 丢失场景1
// 消息的tag过滤
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) { //查找到了对应的消息
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List msgList = MessageDecoder.decodes(byteBuffer); //本次拉取的全部消息
List msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { //todo 如果是需要做过滤(数组>0) && 非ClassFilterMode
msgListFilterAgain = new ArrayList(msgList.size()); // 新的匹配的消息队列
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) { //todo 二次过滤 客户端过滤 根据string类型的tag过滤 不匹配则消息丢失了
msgListFilterAgain.add(msg); // todo 开始做过滤,如果是对应的,就放到msgListFilterAgain
}
}
}
}
.....................
// 设置匹配消息的结果集
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
消息丢失
- 这张图是因为多个Consumer实例(订阅同个ConsumerTopic,不同的Tag),这个由于负载均衡把
TagB的推给了订阅TagC的
public class SubscriptionData implements Comparable{ public final static String SUB_ALL = "*"; private boolean classFilterMode = false; private String topic; // topic private String subString; // 标签 Tag 比如TagA || TagB || TagC private Set tagsSet = new HashSet (); // tag private Set codeSet = new HashSet (); // 保存每个tag的hashcode private long subVersion = System.currentTimeMillis(); // 现在的时间搓
SubscriptionData subscriptionData = new SubscriptionData(); subscriptionData.setTopic(topic); subscriptionData.setSubString(subString); //标签 Tag
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString); //标签 Tag
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\|\|"); // 是否有多个 根据|分隔 TagA | TagB | TagC
if (tags.length > 0) {
for (String tag : tags) { // 遍历tag
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString); // 添加tag
subscriptionData.getCodeSet().add(trimString.hashCode()); // 添加tag的hashcode
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
拉取消息请求头求头看是没有带Tag信息的
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); // 消费组 requestHeader.setTopic(mq.getTopic()); // Topic requestHeader.setQueueId(mq.getQueueId()); // queueId requestHeader.setQueueOffset(offset); // offset requestHeader.setMaxMsgNums(maxNums); //拉取消息数 requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subexpression); requestHeader.setSubVersion(subVersion); // 其实就是现在时间 requestHeader.setexpressionType(expressionType); // 默认expressionType.TAGBrokerController registerProcessor
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);PullMessageProcessor DefaultMQPushConsumerImpl - 丢失场景2 spring boot调试 订阅TagC
@Service @RocketMQMessageListener(topic = "QuickOrder", consumerGroup = "QuickOrderGroup",selectorexpression="TagC") public class TestTopicListener1 implements RocketMQListener订阅TagA|TagB{ protected Logger logger = LoggerFactory.getLogger(TestTopicListener1.class); @Override public void onMessage(String message) { System.out.println("TestTopicListener1 receive message =" +message); logger.info("TestTopicListener1 receive message ={}", message); } }
@Service @RocketMQMessageListener(topic = "QuickOrder", consumerGroup = "QuickOrderGroup",selectorexpression="TagA||TagB") public class TestTopicListener2 implements RocketMQListenerrocketmq-console查看{ protected Logger logger = LoggerFactory.getLogger(TestTopicListener2.class); @Override public void onMessage(String message) { System.out.println("TestTopicListener2 receive message =" +message); logger.info("TestTopicListener2 receive message ={}", message); } }
订阅的TagC已经丢失了
例子 发送10条消息send i=0,tag=TagA SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065B3240000, offsetMsgId=AC10046500002A9F00000000013EBA49, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=11] send i=1,tag=TagB SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065B7140001, offsetMsgId=AC10046500002A9F00000000013EBB0F, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=9] send i=2,tag=TagC SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065BAFF0002, offsetMsgId=AC10046500002A9F00000000013EBBD5, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=1], queueOffset=9] send i=3,tag=TagA SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065BEEB0003, offsetMsgId=AC10046500002A9F00000000013EBC9B, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=2], queueOffset=11] send i=4,tag=TagB SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065C2D50004, offsetMsgId=AC10046500002A9F00000000013EBD61, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=12] send i=5,tag=TagC SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065C6BF0005, offsetMsgId=AC10046500002A9F00000000013EBE27, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=10] send i=6,tag=TagA SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065CAA90006, offsetMsgId=AC10046500002A9F00000000013EBEED, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=1], queueOffset=10] send i=7,tag=TagB SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065CE940007, offsetMsgId=AC10046500002A9F00000000013EBFB3, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=2], queueOffset=12] send i=8,tag=TagC SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065D27D0008, offsetMsgId=AC10046500002A9F00000000013EC079, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=13] send i=9,tag=TagA SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065D6680009, offsetMsgId=AC10046500002A9F00000000013EC13F, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=11]消息订阅显示
10条数据丢失了6条
2021-12-10 09:32:51.373 INFO 141712 --- [MessageThread_5] c.k.r.listener.TestTopicListener2 : TestTopicListener2 receive message =Hello RocketMQ 0,tag=TagA TestTopicListener2 receive message =Hello RocketMQ 3,tag=TagA 2021-12-10 09:32:54.382 INFO 141712 --- [MessageThread_6] c.k.r.listener.TestTopicListener2 : TestTopicListener2 receive message =Hello RocketMQ 3,tag=TagA TestTopicListener2 receive message =Hello RocketMQ 4,tag=TagB 2021-12-10 09:32:55.383 INFO 141712 --- [MessageThread_7] c.k.r.listener.TestTopicListener2 : TestTopicListener2 receive message =Hello RocketMQ 4,tag=TagB TestTopicListener2 receive message =Hello RocketMQ 7,tag=TagB 2021-12-10 09:32:58.390 INFO 141712 --- [MessageThread_8] c.k.r.listener.TestTopicListener2 : TestTopicListener2 receive message =Hello RocketMQ 7,tag=TagB结论
- TagC的消息丢失了
- (i=1、2、5、6、8、9)丢失,10条数据丢失了6条
// todo lost message 同个consumerTopic 就会被替换 public boolean updateSubscription(final SetsubList) { boolean updated = false; for (SubscriptionData sub : subList) { SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); if (null == prev) { updated = true; } } else if (sub.getSubVersion() > old.getSubVersion()) { // 比较时间 时间后面的替换时间前面的(不根据网络先后顺序) if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) { log.info("subscription changed, group: {} OLD: {} NEW: {}", this.groupName, old.toString(), sub.toString() ); } this.subscriptionTable.put(sub.getTopic(), sub); // 替换 } }



