栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

rocketmq消息模式(rocketmq消息堆积)

rocketmq消息模式(rocketmq消息堆积)

文章目录

消息数据结构发送模式生产者启动流程消息发送流程

1. 消息长度验证2. 查找主题路由信息3. 选择消息队列

重试机制选择方式

不启用Broker故障延迟机制启用~ 4. 消息发送

消息数据结构
public class Message implements Serializable {
	private static final long serialVersionUID = 8445773977080406428L;
    private String topic;//必填
    private int flag;
    private Map properties;
    private byte[] body;//必填
    private String transactionId;
}

tag:消息TAG,用于消息过滤。
key: Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息。

发送模式

同步异步oneway(类似kafka的发后即忘) 生产者启动流程

public interface MQAdmin{
}
public interface MQProducer extends MQAdmin{
}
//Client Common configuration
public class ClientConfig{
}
public class DefaultMQProducer extends ClientConfig implements MQProducer{
	protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
}
public class DefaultMQProducerImpl implements MQProducerInner {
}

DefaultMQProducer是默认的消息生产者实现类,本期只分析

    检查productGroup名是否符合规范(空、长度、非法字符等);设instanceName为进程ID(可能有问题)创建MQClientInstance实例。整个JVM实例中只存在一个MQClientManager实例,维护一个缓存表ConcurrentMapfactoryTable

clientId为客户端IP+instance+(unitname可选)

MQClientInstance封装了RocketMQ网络处理API,是Producer、Consumer与NameServer、Broker打交道的网络通道。

    向MQClientInstance注册,将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。启动MQClientInstance,如果MQClientInstance已经启动,则本次启动不会真正执行。
消息发送流程

主要步骤:验证消息=》查找路由=》消息发送(包含异常处理机制)

1. 消息长度验证

默认4mb

2. 查找主题路由信息

真正写入是写入具体的queue

所以该阶段需要获取是master的broker具体ip,从而获取所有可写入的队列,供下一步选择。

此时生产者只知道topic名

    先从缓存找没有再向NameServer根据topic名找TopicPublishInfo,更新本地缓存元数据转换为可用数据循环遍历路由信息的QueueData信息,找到有写权限的根据broker名找到brokerData信息,找到Master节点根据写队列个数,根据topic+序号创建MessageQueue,填充topicPublishInfo的List< QueueMessage >完成
3. 选择消息队列 重试机制

消息发送采用重试机制,可指定次数

同步重试

异步重试:在收到消息发送结构后、执行回调之前进行重试

选择方式 不启用Broker故障延迟机制

默认。sendLatencyFaultEnable=false。此时为轮询选择。

联系第2篇NameServer的设计,producer不能获取到最新的路由信息。因此从broker故障到producer获取到最新路由之前,每次都要轮询这些没有意义的queue,带来了不必要的损耗。

启用~

用ConcurrentHashMap存故障信息,brokerName为key

ConcurrentHashMap
class FaultItem implements Comparable {
  //brokerName
  private final String name;
  //当前消息发送耗时
  private volatile long currentLatency;
  private volatile long startTimestamp;
}
long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L,180000L, 600000L};

规避时长的计算:

isolation=true:30s

false:根据currentLatency从latencyMax中从后往前找第一个小于自己的,获取下标后在notAvailableDuration找到对应规避时长

true和false不能指定,使用时机待深入学习

4. 消息发送
private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,//SYNC,ASYNC,ONEWAY
        final SendCallback sendCallback,//异步消息回调函数
        final TopicPublishInfo topicPublishInfo,//主题路由信息
        final long timeout)//消息发送超时时间
    根据MessageQueue中的brokerName,从brokerAddrTable获取Broker的地址。如果未缓存则从NameServer拉一下该topic的最新路由数据。如果还是找不到Broker信息,则抛出MQClientException,提示Broker不存在。为消息分配全局唯一ID,设置消息flag

如果消息体默认超过4K,会对消息体采用zip压缩,并设置消息的系统标记(int型)为MessageSysFlag.COMPRESSED_FLAG。如果是事务消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。

    如果注册了SendMessageHook,则执行sendMessageBefore()。
public interface SendMessageHook {
    String hookName();
    void sendMessageBefore(final SendMessageContext context);
    void sendMessageAfter(final SendMessageContext context);
}

    构建消息发送请求包。包含一些重要信息:生产者组、主题名称、默认创建主题Key、消息发送时间等。

    根据消息发送方式,同步、异步、单向方式进行网络传输

      同步:

        检查消息发送是否合理

          检查该Broker是否有写权限。

          检查该Topic是否可以进行消息发送。(主要针对默认主题,默认主题不能发送消息,仅仅供路由查找)

          在NameServer端存储主题的配置信息,默认路径:${ROCKET_HOME}/store/config/topic.json。

          主题存储信息:

          order:是否是顺序消息;perm:权限码;readQueueNums:读队列数量;writeQueueNums:写队列数量;topicName:主题名称;topicSysFlag:topicFlag,当前版本暂为保留;topicFilterType:主题过滤方式,当前版本仅支持SINGLE_TAG

          检查队列,如果队列不合法,返回错误码。

        如果消息重试次数超过允许的最大重试次数,消息将进入到延迟队列(DLQ)。延迟队列主题:%DLQ%+消费组名,延迟队列在消息消费时将重点讲解。

        调用DefaultMessageStore#putMessage进行消息存储。关于消息存储的实现细节将在第4章重点剖析。

      异步:调用发送api后,不阻塞等待,只需要提供一个回调函数(SendCallback),供消息发送客户端在收到响应结果回调。并发控制默认值为65535。

      异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。

      oneway:可以看到oneway和同步发送一样没有传入callback函数
      进入MQClientAPIImpl#sendMessage方法,三种发送方式实际上都是调用该方法。下面截取的部分代码说明了只有同步发送会返回结果。

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }
    如果注册了SendMessageHook,执行sendMessageAfter()。注意,就算消息发送过程中发生Exception时该方法也会执行。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771491.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号