栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

9. 源码分析之消息消费

9. 源码分析之消息消费

源码分析之消息消费 Rebalance(针对集群消费模式)

(1)消费Group下的所有消费者

(2)Topic的所有Queue队列

(3)Queue分配策略

触发时机

(1)消费者启动

(2)消费者加入或者退出消费组

(3)定时触发Rebalance(10s)

举例

​ 假设,一个topic中有4个队列,有一个Producer往4个队列中发数据,在集群消费中,在一个消费者分组中如果只有一个消费者。那么这个消费者肯定会消费4个队列,不然就会漏数据。

​ 如果加入了一个Consumer2,这个时候就会触发一个Rebalance(Consumer增加了触发),这个2个消费者平均消费4个队列。

如果再加入了一个Consumer3,这个是否平均分不了,一般的处理,默认情况下,Consumer1消费两个,其他的消费一个。

如果再加入了一个Consumer4,刚好一对一,所以每个 Consumer消费一个队列。

​ 如果再加入了一个Consumer5,消费者数据大于队列,那么Consumer5就消费不了数据,除非队列增加了,或者是说Consumer减少了才行。

所以当你启动多个消费者,如果消费者数量大于queue的数量,也只能有queue数量的消费者消费(就跟在软件公司内部找女朋友一样,狼多肉少)

蛋糕都被吃完了,你没得吃了。这个其实就是消费并发度。消费并发度决定因素是queue的数量。

源码解读

这里讲到的是基于推模式的消费,也就是我们常用的消费模式。(队列推)

consumer.start(); // 启动消费者
public void start() throws MQClientException {
    setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    this.defaultMQPushConsumerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}
// DefaultMQPushConsumerImpl.start()
// TODO 消费者的核心代码入口
public synchronized void start() throws MQClientException {
    switch (this.serviceState) {//刚刚创建
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            //TODO 0.1.检查配置信息
            this.checkConfig();
            //TODO 0.2.加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)
            this.copySubscription();
            //TODO 0.3.创建MQClientInstance实例,
            //TODO 这个实例在一个JVM中消费者和生产者共用,
            //MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc
            //todo MQClientInstance中就会维护

            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(
                this.defaultMQPushConsumer, this.rpcHook);
            //TODO 1.1.负载均衡(队列默认分配算法)
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            //TODO 1.2.队列默认分配算法
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(
                this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
            //TODO 5. 拉取消息(无论是拉模式,还是推模式 :数据都是拉),
            //TODO pullAPIWrapper拉取消息的API包装类,主要有消息的拉取方法和接受拉取到的消息
            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
            //TODO 7.消费进度存储,如果是集群模式,
            //使用远程存储RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
            }
            //TODO 8.加载消息进度(offsetStore是用来操作消费进度的对象)
            //TODO push模式消费进度最后持久化在broker端,但是consumer端在内存中也持有消费进度
            this.offsetStore.load();
            //TODO 9.判断是顺序消息还是并发消息
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
            }
            //TODO 10.消息消费服务并启动
            this.consumeMessageService.start();
            //TODO 11.注册消费者
            boolean registerOK = mQClientFactory.registerConsumer(
                this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
            }
            //TODO 12.MQClientInstance启动(第3步中创建了MQClientInstance)
            mQClientFactory.start(); // here
            log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
        default:
            break;
    }
    //TODO 13.更新TopicRouteData
    this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    //TODO 14.检测broker状态
    this.mQClientFactory.checkClientInBroker();
    //TODO 15.发送心跳
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    //TODO 16.重新负载
    this.mQClientFactory.rebalanceImmediately();
}
// org.apache.rocketmq.client.impl.factory.MQClientInstance#start
// 还是要进入MQClientInstance.start()方法
public void start() throws MQClientException {
    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel
                this.mQClientAPIImpl.start(); // here
                // Start various schedule tasks
                //TODO 12.1 定时任务
                this.startScheduledTask();
                //TODO 12.2 开启拉消息服务(线程)
                this.pullMessageService.start(); // here
                //TODO 12.3 负载均衡服务(线程)
                this.rebalanceService.start(); // here
                // Start push service
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
            default:
                break;
        }
    }
}

​ 在MQClientInstance.start()方法,有一个线程RebalanceService 就是锁Rebalance。具体实现RebalanceService来做的,是我们来看下。

RebalanceService
// org.apache.rocketmq.common.ServiceThread#start
public void start() {
    if (!started.compareAndSet(false, true)) {
        return;
    }
    stopped = false;
    this.thread = new Thread(this, getServiceName());
    this.thread.setDaemon(isDaemon);
    this.thread.start();
}
// org.apache.rocketmq.client.impl.consumer.RebalanceService#run
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance(); // here
    }
    log.info(this.getServiceName() + " service end");
}
// org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
public void doRebalance() {
    for (Map.Entry entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance(); // here
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}
// org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#doRebalance
public void doRebalance() {
    if (this.rebalanceImpl != null) {
        this.rebalanceImpl.doRebalance(false); // here
    }
}
// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
    Map subTable = this.getSubscriptionInner();
    if (subTable != null) {
        for (final Map.Entry entry : subTable.entrySet()) {
            final String topic = entry.getKey();
            try {
                this.rebalanceByTopic(topic, isOrder); // here
            } catch (Throwable e) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    log.warn("rebalanceByTopic Exception", e);
                }
            }
        }
    }
    this.truncateMessageQueueNotMyTopic();
}

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                }
            } else {}
            break;
        }
        case CLUSTERING: {
            Set mqSet = this.topicSubscribeInfoTable.get(topic);
            List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (null == mqSet) {
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {}
            }

            if (null == cidAll) {}

            if (mqSet != null && cidAll != null) {
                List mqAll = new ArrayList();
                mqAll.addAll(mqSet);
                //todo 这里要排序  , 确保每个客户端都是一致的
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; // todo

                List allocateResult = null;
                try {
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    return;
                }

                Set allocateResultSet = new HashSet();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
            break;
        }
        default:
            break;
    }
}

​ 这里有一个针对MessageQueue的排序。为什么这么设计。如果同一个分组的多个客户端,分布在不同的机器上(消费者的机器上),每台客户端都单独算,并且算出来的效果是一致的。

​ 总体消费就是让每一个 Consumer有同样的一个MessageQueue的视图,因为每个消费者的视图是一致的,那么在每个客户端算负载,算出来的结果当然就是一致的。这样就能保障之前的负载均衡的算出之前的效果。

​ 对于 Consumer1和Consumer2,经过统一的排序,在Consumer1客户端也好,还是Consumer2的客户端也好,算出来的结果是一致的。

Consumer1消费 queue1和queue2。Consumer2消费queue3和queue4。

​ 对比Kafka,在消费的时候依赖Zookeeper,broker变动还要走选举之类,如果选不出或者比较卡,这个是会导致负载不正常,负载不成功就不能正常的工作。

​ 而RocketMQ的这种方式简单,并且高可用。

​ 强一致性必定要牺牲高可用性,RocketMQ****设计上更多偏向高可用。

消费者源码解读

​ 在消费的时候有两种模式,一个是并发消费,另外一种是顺序消费。

// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
    Random random = new Random();

    @Override
    public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
        context.setAutoCommit(true);
    }
}
并发消费

三个角色:消费者Consumer、 Borker、NameServer

NameServer主要记录了Borker上有哪些Topic。

  • 在消费者启动之后,第一步都要从NameServer中获取Topic相关信息。

这一步设计到组件之间的交互,RocketMQ使用功能号来设计的。

GET_ROUTEINFO_BY_TOPIC

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
                                                      boolean allowTopicNotExist) {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    //todo 生产者、消费者向NameServer获取路由信息
    RemotingCommand request = RemotingCommand.
        createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.TOPIC_NOT_EXIST: {
            break;
        }
        case ResponseCode.SUCCESS: {
            byte[] body = response.getBody();
            if (body != null) {
                return TopicRouteData.decode(body, TopicRouteData.class);
            }
        }
        default:
            break;
    }
    throw new MQClientException(response.getCode(), response.getRemark());
}
  • 消费者拿到topic相关信息之后,第2步需要知道Topic中有哪些queue,并且消费的时候还跟消费者分组相关。所以这里就需要根据group获取相关信息。(这里有定时触发<默认10s一次>,同时在消费者启动的时候也会主动触发一次)

功能号:GET_CONSUMER_LIST_BY_GROUP

// org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                             consumerGroup,
                             topic,
                             mqSet,
                             mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
            Set mqSet = this.topicSubscribeInfoTable.get(topic);
            List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // here
        }
    }
}
// org.apache.rocketmq.client.impl.factory.MQClientInstance#findConsumerIdList
public List findConsumerIdList(final String topic, final String group) {
    String brokerAddr = this.findBrokerAddrByTopic(topic);
    if (null == brokerAddr) {
        this.updateTopicRouteInfoFromNameServer(topic);
        brokerAddr = this.findBrokerAddrByTopic(topic);
    }

    if (null != brokerAddr) {
        try {
            return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000); // here
        } catch (Exception e) {}
    }
    return null;
}
// org.apache.rocketmq.client.impl.MQClientAPIImpl#getConsumerIdListByGroup
public List getConsumerIdListByGroup(
    final String addr,
    final String consumerGroup,
    final long timeoutMillis) {
    GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup);
    //todo 获取Group的ConsumerList
    RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(
        MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            if (response.getBody() != null) {
                GetConsumerListByGroupResponseBody body =
                    GetConsumerListByGroupResponseBody.decode(
                    response.getBody(), GetConsumerListByGroupResponseBody.class);
                return body.getConsumerIdList(); // here
            }
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
  • 当我们拿到了消费者Group下的所有信息之后,这个就可以做分配,可以分配到比如自己这台消费者的应该要消费哪些主机上的哪些队列。

这个地方叫DoRebalance,同时这个DoRebalacne之前已经细讲(具体这里不细讲)

// org.apache.rocketmq.client.impl.consumer.RebalanceService#run
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

public void doRebalance() {
    for (Map.Entry entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            try {
                impl.doRebalance();
            } catch (Throwable e) {}
        }
    }
}
  • 确定了消费者的group、topic、还有queue之后,还需要知道从哪个位置开始消费。于是还需要获取Queue的Offset

    功能号:QUERY_CONSUMER_OFFSET

// org.apache.rocketmq.client.impl.MQClientAPIImpl#queryConsumerOffset
public long queryConsumerOffset(
    final String addr,
    final QueryConsumerOffsetRequestHeader requestHeader,
    final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
    //todo 获取Queue的消费Offset
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);

    RemotingCommand response = this.remotingClient
        .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                                                              request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            QueryConsumerOffsetResponseHeader responseHeader = (QueryConsumerOffsetResponseHeader) response
                .decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
            return responseHeader.getOffset();
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

调用的地方RemoteBrokerOffsetStore类中fetchConsumeOffsetFromBroker

// RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker
private long fetchConsumeOffsetFromBroker(MessageQueue mq) {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());

        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset( // here
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}
  • 确定了消费者的group、topic、还有queue和需要获取Queue的Offset,就要正式开始拉取消息了。

​ 送入的信息:topic、queueid、offset,

​ 还有maxnum(每次拉取多少条消息),

​ suspendtimeout 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 默认值 20000

功能号:PULL_MESSAGE

  • 拉到消息后,消费者就要进行消息的消费了。消费完了之后,要更新offset,这个时候也要发起调用

功能号:UPDATE_CONSUMER_OFFSET

​ 这个地方要注意有两种方式:

​ 1、 定时,默认5s提交

​ 2、 前面步骤的拉取消息时会带入参数:commitoffset,这个时候也会更新。

  • 最后的话,消费者关闭的话,也会调用

功能号:UNREGISTER_CLIENT

​ 当然,生产者和和Broker之间还有心跳机制,这里就不多说了。

顺序消费

顺序消费的主体步骤和并发消费差不多,主要的差别就是有一个加锁和解锁的过程。

  • 只要确定了是拉哪个queue。这个地方要加锁,加锁的目的就可以达到顺序性。在一个queue中消息是顺序的,当一个消费者确定了一个queue进行消费时,使用一个分布式锁机制,是不是就可以确定这个消费者的顺序性。

​ 加锁Queue

​ LOCK_BATCH_MQ

同时发现,这个地方也有一个定时执行,20s,这个是周期性的去续锁。因为在broker,这把的锁的时间也有一定的失效的,(默认60s),如果超过这个时间,这把锁就释放了。

​ Broker端针对这个的实现就是一个ReentrantLock而已。

  • 解锁Queue

​ UNLOCK_BATCH_MQ

消费中常见问题 重复消息

​ RocketMQ生产也好,消费也好,有重试机制、重发队列等等,所以在网络情况不太好的情况下, RocketMQ避免不了消息的重复。

如果业务需要去重,需要自己做数据处理的幂等性,如果依赖数据库去重可以做到持久化的幂等操作但是会导致数据消费性能下降,数据重复毕竟是低概率事件,我们可以将已经成功消费的数据key缓存在redis两个小时,如果redis有就直接略过,这样提高了性能也保证了数据不重复消费

消费卡死

​ 消费的流程中,尤其是针对顺序消息,我们感觉上会有卡死的现象,由于顺序消息中需要到Broker中加锁,如果消费者某一个挂了,那么在Broker层是维护了60s的时间才能释放锁,所以在这段时间只能等,消费者是消费不了的,在等待锁。

​ 另外如果还有Broker层面也挂了,如果是主从机构,获取锁都是走的Master节点,如果Master节点挂了,走Slave消费,但是slave节点上没有锁,所以顺序消息如果发生了这样的情况,也是会有卡死的现象。

启动之后较长时间才消费

​ 在并发消费的时候,当我们启动了非常多的消费者,维护了非常多的topic的时候、或者queue比较多的时候,你可以看到消费的流程的交互是比较多的(5~6步),要启动多线程,也要做相当多的事情,所以你会感觉要启动较长的时间才能消费。

​ 还有顺序消费的时候,如果是之前的消费者挂了,这个锁要60秒才会释放,也会导致下一个消费者启动的时候需要等60s才能消费

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630510.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号