消息数据结构发送模式生产者启动流程消息发送流程
1. 消息长度验证2. 查找主题路由信息3. 选择消息队列
重试机制选择方式
不启用Broker故障延迟机制启用~ 4. 消息发送
消息数据结构public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;//必填
private int flag;
private Map properties;
private byte[] body;//必填
private String transactionId;
}
tag:消息TAG,用于消息过滤。
key: Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息。
同步异步oneway(类似kafka的发后即忘) 生产者启动流程
public interface MQAdmin{
}
public interface MQProducer extends MQAdmin{
}
//Client Common configuration
public class ClientConfig{
}
public class DefaultMQProducer extends ClientConfig implements MQProducer{
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
}
public class DefaultMQProducerImpl implements MQProducerInner {
}
DefaultMQProducer是默认的消息生产者实现类,本期只分析
- 检查productGroup名是否符合规范(空、长度、非法字符等);设instanceName为进程ID(可能有问题)创建MQClientInstance实例。整个JVM实例中只存在一个MQClientManager实例,维护一个缓存表ConcurrentMap
clientId为客户端IP+instance+(unitname可选)
MQClientInstance封装了RocketMQ网络处理API,是Producer、Consumer与NameServer、Broker打交道的网络通道。
- 向MQClientInstance注册,将当前生产者加入到MQClientInstance管理中,方便后续调用网络请求、进行心跳检测等。启动MQClientInstance,如果MQClientInstance已经启动,则本次启动不会真正执行。
主要步骤:验证消息=》查找路由=》消息发送(包含异常处理机制)
1. 消息长度验证默认4mb
2. 查找主题路由信息真正写入是写入具体的queue
所以该阶段需要获取是master的broker具体ip,从而获取所有可写入的队列,供下一步选择。
此时生产者只知道topic名
- 先从缓存找没有再向NameServer根据topic名找TopicPublishInfo,更新本地缓存元数据转换为可用数据循环遍历路由信息的QueueData信息,找到有写权限的根据broker名找到brokerData信息,找到Master节点根据写队列个数,根据topic+序号创建MessageQueue,填充topicPublishInfo的List< QueueMessage >完成
消息发送采用重试机制,可指定次数
同步重试
异步重试:在收到消息发送结构后、执行回调之前进行重试
选择方式 不启用Broker故障延迟机制默认。sendLatencyFaultEnable=false。此时为轮询选择。
联系第2篇NameServer的设计,producer不能获取到最新的路由信息。因此从broker故障到producer获取到最新路由之前,每次都要轮询这些没有意义的queue,带来了不必要的损耗。
启用~用ConcurrentHashMap存故障信息,brokerName为key
ConcurrentHashMapclass FaultItem implements Comparable { //brokerName private final String name; //当前消息发送耗时 private volatile long currentLatency; private volatile long startTimestamp; } long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L,180000L, 600000L};
规避时长的计算:
isolation=true:30s
false:根据currentLatency从latencyMax中从后往前找第一个小于自己的,获取下标后在notAvailableDuration找到对应规避时长
true和false不能指定,使用时机待深入学习
4. 消息发送private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,//SYNC,ASYNC,ONEWAY
final SendCallback sendCallback,//异步消息回调函数
final TopicPublishInfo topicPublishInfo,//主题路由信息
final long timeout)//消息发送超时时间
- 根据MessageQueue中的brokerName,从brokerAddrTable获取Broker的地址。如果未缓存则从NameServer拉一下该topic的最新路由数据。如果还是找不到Broker信息,则抛出MQClientException,提示Broker不存在。为消息分配全局唯一ID,设置消息flag
如果消息体默认超过4K,会对消息体采用zip压缩,并设置消息的系统标记(int型)为MessageSysFlag.COMPRESSED_FLAG。如果是事务消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。
- 如果注册了SendMessageHook,则执行sendMessageBefore()。
public interface SendMessageHook {
String hookName();
void sendMessageBefore(final SendMessageContext context);
void sendMessageAfter(final SendMessageContext context);
}
构建消息发送请求包。包含一些重要信息:生产者组、主题名称、默认创建主题Key、消息发送时间等。
根据消息发送方式,同步、异步、单向方式进行网络传输
同步:
检查消息发送是否合理
检查该Broker是否有写权限。
检查该Topic是否可以进行消息发送。(主要针对默认主题,默认主题不能发送消息,仅仅供路由查找)
在NameServer端存储主题的配置信息,默认路径:${ROCKET_HOME}/store/config/topic.json。
主题存储信息:
order:是否是顺序消息;perm:权限码;readQueueNums:读队列数量;writeQueueNums:写队列数量;topicName:主题名称;topicSysFlag:topicFlag,当前版本暂为保留;topicFilterType:主题过滤方式,当前版本仅支持SINGLE_TAG
检查队列,如果队列不合法,返回错误码。
如果消息重试次数超过允许的最大重试次数,消息将进入到延迟队列(DLQ)。延迟队列主题:%DLQ%+消费组名,延迟队列在消息消费时将重点讲解。
调用DefaultMessageStore#putMessage进行消息存储。关于消息存储的实现细节将在第4章重点剖析。
异步:调用发送api后,不阻塞等待,只需要提供一个回调函数(SendCallback),供消息发送客户端在收到响应结果回调。并发控制默认值为65535。
异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。
oneway:可以看到oneway和同步发送一样没有传入callback函数
进入MQClientAPIImpl#sendMessage方法,三种发送方式实际上都是调用该方法。下面截取的部分代码说明了只有同步发送会返回结果。
switch (communicationMode) {
case ONEWAY:
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
- 如果注册了SendMessageHook,执行sendMessageAfter()。注意,就算消息发送过程中发生Exception时该方法也会执行。



