栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Consumer消息丢失场景-图文并茂-源码分析

Consumer消息丢失场景-图文并茂-源码分析

rocketmq-client

本次演示api


	org.apache.rocketmq
	rocketmq-spring-boot-starter
	2.2.1

broker MessageFilter
public interface MessageFilter {
    
    boolean isMatchedByConsumeQueue(final Long tagsCode,
        final ConsumeQueueExt.CqExtUnit cqExtUnit);

    
    boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,
        final Map properties);
}
DefaultMessageFilter
public class DefaultMessageFilter implements MessageFilter {

    private SubscriptionData subscriptionData;

    public DefaultMessageFilter(final SubscriptionData subscriptionData) {
        this.subscriptionData = subscriptionData;
    }

    @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        if (null == tagsCode || null == subscriptionData) {
            return true;
        }

        if (subscriptionData.isClassFilterMode()) {
            return true;
        }

        return subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)
            || subscriptionData.getCodeSet().contains(tagsCode.intValue());
    }

    @Override
    public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) {
        return true;
    }
}
consumer PullAPIWrapper - 丢失场景1
// 消息的tag过滤
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
	final SubscriptionData subscriptionData) {
	PullResultExt pullResultExt = (PullResultExt) pullResult;

	this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
	if (PullStatus.FOUND == pullResult.getPullStatus()) { //查找到了对应的消息
		ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
		List msgList = MessageDecoder.decodes(byteBuffer); //本次拉取的全部消息

		List msgListFilterAgain = msgList;
		if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { //todo 如果是需要做过滤(数组>0)  && 非ClassFilterMode
			msgListFilterAgain = new ArrayList(msgList.size()); // 新的匹配的消息队列
			for (MessageExt msg : msgList) {
				if (msg.getTags() != null) {
					if (subscriptionData.getTagsSet().contains(msg.getTags())) { //todo  二次过滤 客户端过滤  根据string类型的tag过滤 不匹配则消息丢失了
						msgListFilterAgain.add(msg); // todo 开始做过滤,如果是对应的,就放到msgListFilterAgain
					}
				}
			}
		}

		.....................
		// 设置匹配消息的结果集
		pullResultExt.setMsgFoundList(msgListFilterAgain);
	}

	pullResultExt.setMessageBinary(null);

	return pullResult;
}
消息丢失
  • 这张图是因为多个Consumer实例(订阅同个ConsumerTopic,不同的Tag),这个由于负载均衡把
    TagB的推给了订阅TagC的

consumer异步拉取 FilterAPI SubscriptionData
public class SubscriptionData implements Comparable {
    public final static String SUB_ALL = "*";
    private boolean classFilterMode = false;
    private String topic; // topic
    private String subString; // 标签 Tag  比如TagA || TagB || TagC
    private Set tagsSet = new HashSet(); // tag
    private Set codeSet = new HashSet(); // 保存每个tag的hashcode
    private long subVersion = System.currentTimeMillis(); // 现在的时间搓
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString); //标签 Tag
 public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
	String subString) throws Exception {
	SubscriptionData subscriptionData = new SubscriptionData();
	subscriptionData.setTopic(topic);
	subscriptionData.setSubString(subString); //标签 Tag

	if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
		subscriptionData.setSubString(SubscriptionData.SUB_ALL);
	} else {
		String[] tags = subString.split("\|\|"); // 是否有多个 根据|分隔  TagA | TagB | TagC
		if (tags.length > 0) {
			for (String tag : tags) { // 遍历tag
				if (tag.length() > 0) {
					String trimString = tag.trim();
					if (trimString.length() > 0) {
						subscriptionData.getTagsSet().add(trimString); // 添加tag
						subscriptionData.getCodeSet().add(trimString.hashCode()); // 添加tag的hashcode
					}
				}
			}
		} else {
			throw new Exception("subString split error");
		}
	}

	return subscriptionData;
}
拉取消息请求头求头看是没有带Tag信息的
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup); // 消费组
requestHeader.setTopic(mq.getTopic()); // Topic
requestHeader.setQueueId(mq.getQueueId());  // queueId
requestHeader.setQueueOffset(offset); // offset
requestHeader.setMaxMsgNums(maxNums); //拉取消息数
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subexpression);
requestHeader.setSubVersion(subVersion);  // 其实就是现在时间
requestHeader.setexpressionType(expressionType); // 默认expressionType.TAG
BrokerController registerProcessor
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
PullMessageProcessor DefaultMQPushConsumerImpl - 丢失场景2 spring boot调试

订阅TagC
@Service
@RocketMQMessageListener(topic = "QuickOrder", consumerGroup = "QuickOrderGroup",selectorexpression="TagC")
public class TestTopicListener1  implements RocketMQListener {

    protected Logger logger = LoggerFactory.getLogger(TestTopicListener1.class);


    @Override
    public void onMessage(String message) {
        System.out.println("TestTopicListener1 receive message =" +message);
        logger.info("TestTopicListener1 receive message ={}", message);
    }

}
订阅TagA|TagB
@Service
@RocketMQMessageListener(topic = "QuickOrder", consumerGroup = "QuickOrderGroup",selectorexpression="TagA||TagB")
public class TestTopicListener2 implements RocketMQListener {

    protected Logger logger = LoggerFactory.getLogger(TestTopicListener2.class);


    @Override
    public void onMessage(String message) {
        System.out.println("TestTopicListener2 receive message =" +message);
        logger.info("TestTopicListener2 receive message ={}", message);
    }

}
rocketmq-console查看

订阅的TagC已经丢失了

例子 发送10条消息
send i=0,tag=TagA
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065B3240000, offsetMsgId=AC10046500002A9F00000000013EBA49, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=11]
send i=1,tag=TagB
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065B7140001, offsetMsgId=AC10046500002A9F00000000013EBB0F, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=9]
send i=2,tag=TagC
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065BAFF0002, offsetMsgId=AC10046500002A9F00000000013EBBD5, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=1], queueOffset=9]
send i=3,tag=TagA
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065BEEB0003, offsetMsgId=AC10046500002A9F00000000013EBC9B, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=2], queueOffset=11]
send i=4,tag=TagB
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065C2D50004, offsetMsgId=AC10046500002A9F00000000013EBD61, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=12]
send i=5,tag=TagC
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065C6BF0005, offsetMsgId=AC10046500002A9F00000000013EBE27, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=10]
send i=6,tag=TagA
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065CAA90006, offsetMsgId=AC10046500002A9F00000000013EBEED, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=1], queueOffset=10]
send i=7,tag=TagB
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065CE940007, offsetMsgId=AC10046500002A9F00000000013EBFB3, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=2], queueOffset=12]
send i=8,tag=TagC
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065D27D0008, offsetMsgId=AC10046500002A9F00000000013EC079, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=3], queueOffset=13]
send i=9,tag=TagA
SendResult [sendStatus=SEND_OK, msgId=7F000001CD4018B4AAC23065D6680009, offsetMsgId=AC10046500002A9F00000000013EC13F, messageQueue=MessageQueue [topic=QuickOrder, brokerName=broker-m1, queueId=0], queueOffset=11]

消息订阅显示

10条数据丢失了6条

2021-12-10 09:32:51.373  INFO 141712 --- [MessageThread_5] c.k.r.listener.TestTopicListener2        : TestTopicListener2 receive message =Hello RocketMQ 0,tag=TagA
TestTopicListener2 receive message =Hello RocketMQ 3,tag=TagA
2021-12-10 09:32:54.382  INFO 141712 --- [MessageThread_6] c.k.r.listener.TestTopicListener2        : TestTopicListener2 receive message =Hello RocketMQ 3,tag=TagA
TestTopicListener2 receive message =Hello RocketMQ 4,tag=TagB
2021-12-10 09:32:55.383  INFO 141712 --- [MessageThread_7] c.k.r.listener.TestTopicListener2        : TestTopicListener2 receive message =Hello RocketMQ 4,tag=TagB
TestTopicListener2 receive message =Hello RocketMQ 7,tag=TagB
2021-12-10 09:32:58.390  INFO 141712 --- [MessageThread_8] c.k.r.listener.TestTopicListener2        : TestTopicListener2 receive message =Hello RocketMQ 7,tag=TagB

结论
  • TagC的消息丢失了
  • (i=1、2、5、6、8、9)丢失,10条数据丢失了6条
broker端 ConsumerGroupInfo SubscriptionData

updateSubscription
// todo lost message  同个consumerTopic 就会被替换
public boolean updateSubscription(final Set subList) {
	boolean updated = false;

	for (SubscriptionData sub : subList) {
		SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
		if (old == null) {
			SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
			if (null == prev) {
				updated = true;
			}
		} else if (sub.getSubVersion() > old.getSubVersion()) {  
		    // 比较时间 时间后面的替换时间前面的(不根据网络先后顺序)
			if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
				log.info("subscription changed, group: {} OLD: {} NEW: {}",
					this.groupName,
					old.toString(),
					sub.toString()
				);
			}

			this.subscriptionTable.put(sub.getTopic(), sub); // 替换
		}
	}
	
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653935.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号