栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rocketmq核心源码分析第十九篇一消息消费五步曲一消息消费

rocketmq核心源码分析第十九篇一消息消费五步曲一消息消费

文章目录

序言:PullCallback消费原理图ConsumeMessageOrderlyService一顺序消费

消费服务构造函数提交消费请求消费执行 ConsumeMessageConcurrentlyService一并发消费

消费服务构造函数提交消费请求消费执行

序言:PullCallback

broker 响应mqclient,执行pullCallback回调触发消息存储消费缓冲区processQueue触发消费线程消费消息

PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        ...... 删除其他代码
        统计rt信息
        long prevRequestOffset = pullRequest.getNextOffset();
        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
        long pullRT = System.currentTimeMillis() - beginTimestamp;
        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
            pullRequest.getMessageQueue().getTopic(), pullRT);
        long firstMsgOffset = Long.MAX_VALUE;
        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
            如果消息体数量为0 则重新拉消息
            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
        } else {
            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
            增加统计信息
            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
            核心1: 将拉取到的32条消息添加到processQueue的msgTreeMap
            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
            核心2: 将拉取到的32条消息构建consumerequest 给消费线程消费
            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                pullResult.getMsgFoundList(),
                processQueue,
                pullRequest.getMessageQueue(),
                dispatchToConsume);
            ...... 交给PullRequestService重新拉取
        }
        ...... 删除其他代码
    }
};
消费原理图

ConsumeMessageOrderlyService一顺序消费 消费服务构造函数

虽然是Order顺序消费,但依旧是多线程通过锁保障顺序性

class ConsumeRequest implements Runnable {
    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
                                    MessageListenerOrderly messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new linkedBlockingQueue();

    this.consumeExecutor = new ThreadPoolExecutor(
            // 默认20线程
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            // 默认20线程
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            // 无界阻塞队列
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_")
            // 默认异常策略
    );

    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
提交消费请求

不同于并发消费,顺序消费的消息不是显示入参基于processQueue进行消息消费ConsumeRequest若存在则无需多次构建

public void submitConsumeRequest(
        final List msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
    只有加入processQueue内含有消息 并且没有已经在处理的ConsumeRequest
    则构建新的ConsumeRequest提交
    if (dispathToConsume) {
        consumeExecutor消费直接基于processQueue消费消息
        假设已经有老的ConsumeRequest在处理,则无需构建新的ConsumeRequest  因为新老processQueue相同,消息集合都位于processQueue,一个ConsumeRequest即可 
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        this.consumeExecutor.submit(consumeRequest);
    }
}
消费执行

ConsumeMessageOrderlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法
虽然多线程,但依据submitConsumeRequest,一般也就单线程执行
顺序性基于锁而非线程数保障

基于processQueue进行顺序消费获取分页大小根据分页大小获取offset有序的消息集合前置钩子处理基于消息队列messagequeue一processqueue处理队列维度加锁消费后置钩子处理处理重试逻辑[阻塞]

 class ConsumeRequest implements Runnable {
    private final ProcessQueue processQueue;
    private final MessageQueue messageQueue;

    ...... 删除其他代码

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }
        获取当前 messageQueue 的锁
        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        
        synchronized (objLock) {
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (
                    this.processQueue.isLocked() && !this.processQueue.isLockExpired()
            )
            ) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    ...... 删除其他代码
                    获取分页大小
                    final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
                    一般默认一条
                    processQueue消息存储格式如下 TreeMap msgTreeMap
                    根据message_offset消息偏移量构建有序map,所以t获取的消息保障了偏移量从小到大有序性
                    List msgs = this.processQueue.takeMessags(consumeBatchSize);
                    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                    if (!msgs.isEmpty()) {
                        final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
                        ConsumeOrderlyStatus status = null;
                        ...... 删除代码: 执行before hook
                         
                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        try {
                            processQueue维度加锁
                            this.processQueue.getLockConsume().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                break;
                            }
                            消费
                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                    RemotingHelper.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue);
                            hasException = true;
                        } finally {
                            this.processQueue.getLockConsume().unlock();
                        }
                        ...... 删除其他代码
                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        ...... 删除其他代码以及后置钩子执行
                        rt时间统计
                        ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                        处理重试逻辑[阻塞]
                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        continueConsume = false;
                    }
                }
            } else {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }
                稍后重消费
                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }
}
ConsumeMessageConcurrentlyService一并发消费 消费服务构造函数

consumeExecutor处理ConsumeRequest任务(Runnable)进行消息消息当拒绝异常,会通过submitConsumeRequestLater稍后提交消费线程池

  public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListener;

        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new linkedBlockingQueue();
        消费线程池参数
        this.consumeExecutor = new ThreadPoolExecutor(
            默认20
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            默认20
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            默认无界阻塞队列
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_")
            默认异常策略
        );
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }
提交消费请求

交由consumeExecutor线程池进行消费基于拉取的msgs直接分页后提交消费线程池

public void submitConsumeRequest(
    final List msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    // 根据配置的一次最多消费多少消费拉起下来的本地消息 一般默认就是一个
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        // 默认拉取32条但是一次只消费1条
        for (int total = 0; total < msgs.size(); ) {
            List msgThis = new ArrayList(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}
消费执行

ConsumeMessageConcurrentlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法

直接对ConsumeRequest.msgs进行消费,无需处理顺序性再平衡或者其他相关策略,无需消费获取业务执行listener执行Before Hook执行消费逻辑 [一个分页合集,默认1条]执行After Hook根据ackIndex 处理重试逻辑

class ConsumeRequest implements Runnable {
    
    private final List msgs;
    // 拉取消息同时存储在processQueue.msgTreeMap
    private final ProcessQueue processQueue;
    // ProcessQueue拉取的消息对应的消息队列
    private final MessageQueue messageQueue;

    public ConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
        this.msgs = msgs;
        this.processQueue = processQueue;
        this.messageQueue = messageQueue;
    }

    public List getMsgs() {
        return msgs;
    }

    public ProcessQueue getProcessQueue() {
        return processQueue;
    }


    @Override
    public void run() {
        // 再平衡或者其他相关策略,无需消费
        if (this.processQueue.isDropped()) {
            log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
            return;
        }
        // 获取业务执行listener
        MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
        ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
        ConsumeConcurrentlyStatus status = null;
        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

        ConsumeMessageContext consumeMessageContext = null;
        // 执行Before Hook
        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
            consumeMessageContext.setProps(new HashMap());
            consumeMessageContext.setMq(messageQueue);
            consumeMessageContext.setMsgList(msgs);
            consumeMessageContext.setSuccess(false);
            // 消费前先执行钩子
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
        }

        long beginTimestamp = System.currentTimeMillis();
        boolean hasException = false;
        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
        try {
            if (msgs != null && !msgs.isEmpty()) {
                for (MessageExt msg : msgs) {
                    MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                }
            }
            // 执行消费逻辑 一个分页合集,默认1条
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        } catch (Throwable e) {
            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                RemotingHelper.exceptionSimpleDesc(e),
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            hasException = true;
        }
        long consumeRT = System.currentTimeMillis() - beginTimestamp;
        if (null == status) {
            if (hasException) {
                returnType = ConsumeReturnType.EXCEPTION;
            } else {
                returnType = ConsumeReturnType.RETURNNULL;
            }
        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
            returnType = ConsumeReturnType.TIME_OUT;
        } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
            returnType = ConsumeReturnType.FAILED;
        } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
            returnType = ConsumeReturnType.SUCCESS;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
        }

        if (null == status) {
            log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                ConsumeMessageConcurrentlyService.this.consumerGroup,
                msgs,
                messageQueue);
            status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
            consumeMessageContext.setStatus(status.toString());
            consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
            // 执行后置钩子
            ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
        }

        ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
            .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

        // 根据ackIndex 处理重试逻辑【retry_consume_topic】
        if (!processQueue.isDropped()) {
            ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
        } else {
            log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
        }
    }

    public MessageQueue getMessageQueue() {
        return messageQueue;
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729270.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号