RocketMQ的记录与学习
一、RocketMQ的介绍 1.1 消息中间件(MessageQueue,MQ) 1.2 消息中间件的使用场景
https://blog.csdn.net/zhangkunkkk/article/details/125451633?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22125451633%22%2C%22source%22%3A%22zhangkunkkk%22%7D&ctrtid=Tp5o4
2.1 下载从官网下载4.8.0版本:https://rocketmq.apache.org/dowloading/releases/
1.电脑为64位 2.JDK为1.8版本
配置RocketMQ全局环境变量: 变量名:ROCKETMQ_HOME
变量值:E:runtoolsrocketmqrocketmq-4.9.3
编辑 rocketmq-4.9.2binrunserver.cmd
rem set “JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m”
set “JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m”
编辑 rocketmq-4.9.2binrunbroker.cmd
rem set “JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g”
set “JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m”
修改日志配置文件
将两个配置文件添加如下配置,且将原 user.home 替换为 LOG_BASE
logback_broker.xml 内容修改直接如下
u
s
e
r
.
h
o
m
e
/
l
o
g
s
/
r
o
c
k
e
t
m
q
l
o
g
s
/
{user.home}/logs/rocketmqlogs/
user.home/logs/rocketmqlogs/{brokerLogDir}/broker_default.log
logback_namesrv.xml
property name=“LOG_BASE” value=“E:/rocketmqos/rocketmq-4.9.3/” />
${user.home}/logs/rocketmqlogs/namesrv.log
true
cmd命令内容:先启动namesrv、然后启动broker
cd E:rocketmqosrocketmq-4.9.3bin
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
下载并且解压
打开IDEA进行源码的编译
之后创建RocketMQ文件,在下面拉入distribution中的conf,自己创建logs和store,并且配置
之后启动
broker配置
下载地址:https://codeload.github.com/apache/rocketmq-external/zip/master
(自己看网址吧,太多了)
消息队列的发送
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
//实例化消息生产者Producer
DefaultMQProducer producer=new DefaultMQProducer("group_test");
//设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendLatencyFaultEnable(true);
//启动Producer实例
producer.start();
for(int i=0;i<10;i++){
//创建消息,并指定Topic,Tag和消息体
Message msg=new Message("TopicTest",
"TagA",
("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送消息到一个Broker
SendResult sendResult=producer.send(msg);
System.out.printf("%s%n",sendResult);
}
//如果不在发送消息,关闭Producer
producer.shutdown();
}
}
显示结果
在控制台发现,发送了10条消息,4个队列中总共进入10条消息(初始值为10,9,10,11)
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {
//实例化消息生产者Producer
DefaultMQProducer producer=new DefaultMQProducer("group_test");
//设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for(int i=0;i<10;i++){
final int index=i;
//创建消息,并指定Topic,Tag和消息体
Message msg=new Message("TopicTest",
"TagA",
("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//SendCallback接受异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n",sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exceptin %s %n",index,e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
//如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}
结果显示
在控制台发现,发送了10条消息,4个队列中总共进入10条消息(初始值为13,12,12,13)
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//实例化消息生产者Producer
DefaultMQProducer producer=new DefaultMQProducer("group_test");
//设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动Producer实例
producer.start();
for(int i=0;i<10;i++){
//创建消息,并指定Topic,Tag和消息体
Message msg=new Message("TopicTest",
"TagA",
("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
//如果不在发送消息,关闭Producer
producer.shutdown();
}
}
结果显示
发送成功
每个消息只能被处理一次,不会产生消息的重复。可靠性高,对消费者的进度在服务端进行保存。
public class BalanceConsumer{
public static void main(String[] args) throws Exception {
//实例化消息生产者Producer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
//设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅Topic
consumer.subscribe("TopicTest", "*");
//consumer.setConsumeFromWhere();
//集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
//注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = null;
msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.printf("收到消息:" + "topic:" + topic + ",tags :" + tags + ",msg" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
//注销Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
再创建一个BalanceConsumer2与上述一致
通过同步消息发送消息,两个集群的结果显示
Consumer1
Consumer2
发现发送的10条消息被均摊到两个集群当中
不支持顺序消息,不支持重置消费位点(不支持消费的时候重新从一个偏移量开始消费),消费者进度在客户端,会产生重复的概率会打,一条消息会被多次消费。如果每一次客户端重启,都会从最新的消息上进行处理,在客户端重启的这段时间,客户端不会接收到新增的消息,如果在这段时间有消息发送,则不会被接受处理。
public class BalanceConsumerGB {
public static void main(String[] args) throws Exception {
//实例化消息生产者Producer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
//设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅Topic
consumer.subscribe("TopicTest", "*");
//consumer.setConsumeFromWhere();
//广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = null;
msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + "topic:" + topic + ",tags :" + tags + ",msg" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
//注销Consumer
//consumer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
再创建一个BalanceConsumerGB2与上述一致
通过同步消息发送消息,集群消费的两个消费者的结果显示
BalanceConsumerGB
BalanceConsumerGB2
发送的10条消息都被发送到各个广播消费者当中
全局顺序消息:只有一个队列,可以确保消息在队列当中的顺序性
部分顺序消息:
全局有序
全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。
在电商业务场景中,一个订单的流程是:创建、付款、推送、完成。在加入RocketMQ后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到RocketMQ中的一个主题中,这里该如何实现针对一个订单的消息顺序性呢!如下图:
要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。
部分顺序消息生产者
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//订单列表
List orderList = new ProducerInOrder().buildOrders();
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getId()); //订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s" ,
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
private List buildOrders(){
List orderList=new ArrayList<>();
Order orderDemo=new Order();
orderDemo.setDesc("创建");
orderDemo.setId(001);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("创建");
orderDemo.setId(002);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("付款");
orderDemo.setId(001);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("创建");
orderDemo.setId(003);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("付款");
orderDemo.setId(002);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("完成");
orderDemo.setId(001);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("付款");
orderDemo.setId(003);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("完成");
orderDemo.setId(002);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("推送");
orderDemo.setId(001);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("完成");
orderDemo.setId(003);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("推送");
orderDemo.setId(002);
orderList.add(orderDemo);
orderDemo=new Order();
orderDemo.setDesc("推送");
orderDemo.setId(003);
orderList.add(orderDemo);
return orderList;
}
}
部分顺序消息消费者
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
//可以看到每个queue有唯一的consume线程来消费,订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ",queueId=" + msg.getQueueId() + ",content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
结果显示
注意事项
使用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,同时consume消费消息失败时,不能返回reconsume——later,这样会导致乱序,所以应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
六、延时消息 延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时向RocketMQ发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
6.2 使用案例Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(RocketMQ的商业版本Aliware MQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)
Apache RocketMQ发送延时消息是设置在每一个消息体上的,在创建消息时设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息的level,区分18个等级:level为1,表示延迟1秒后消费;level为2表示延迟5秒后消费;level为3表示延迟10秒后消费;以此类推;最大level为18表示延迟2个小时消费。具体标识如下:
| level | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
|---|---|---|---|---|---|---|---|---|---|
| 延迟 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5s |
| level | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
|---|---|---|---|---|---|---|---|---|---|
| 延迟 | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
是这生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
延时消息生产者
public class ScheduledProducer {
public static void main(String[] args) throws Exception{
//实例化一个生产者来产生延时消息
DefaultMQProducer producer=new DefaultMQProducer("ScheduledProducer");
//设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
//启动producer实例
producer.start();
int totalMessageToSend=10;
for(int i=0;i
Message message=new Message("ScheduledTopic",("Hello scheduled message"+i).getBytes());
//设置延时等级4,这个消息将在30s之后投递给消费者
//秒1 5 10 30
//分1 2 3 4 5 6 7 8 9 10 20 30
//时1 2
message.setDelayTimeLevel(4);
//发送消息
producer.send(message);
}
//关闭生产者
producer.shutdown();
}
}
延时消息消费者
public class ScheduledConsumer {
public static void main(String[] args) throws Exception{
//实例化消费者
//实例化一个生产者来产生延时消息
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("ScheduledProducer");
//设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅Topics
consumer.subscribe("ScheduledTopic","*");
//注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for(MessageExt msg:msgs){
//打印消息消费延迟
System.out.println("Receive message[msgId="+msg.getMsgId()+"]"
+(msg.getStoreTimestamp()-msg.getBornTimestamp())+"mslater");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
结果显示,消费者中接收到生产者发送的数据
在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。
在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。
消费者
public class BatchConsumer {
public static void main(String[] args) throws Exception{
//实例化消费者
//实例化一个生产者来产生延时消息
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("BatchConsumer");
//设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅Topics
consumer.subscribe("BatchTest","*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
//注册回调函数处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages:%s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
生产者
public class BatchProducerNoSplit {
public static void main(String[] args) throws Exception{
//实例化消息生产者Producer
DefaultMQProducer producer=new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic="BatchTest";
List message=new ArrayList<>();
message.add(new Message(topic,"Tag","OrderID001","Hello world 0".getBytes()));
message.add(new Message(topic,"Tag","OrderID002","Hello world 0".getBytes()));
message.add(new Message(topic,"Tag","OrderID003","Hello world 0".getBytes()));
try{
producer.send(message);
}catch (Exception e){
producer.shutdown();
e.printStackTrace();
}
//不发送消息关闭Producer
producer.shutdown();
}
}
7.2 批量切分发送
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。
我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。
生产者
public class BatchProducerSplit {
public static void main(String[] args) throws Exception {
//实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "BatchTest";
List messages = new ArrayList<>(100 * 1000);
//十万像素数组
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world" + i).getBytes()));
}
//把最大的消息分裂成若干个小的消息(1M左右)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
//不发送消息关闭Producer
producer.shutdown();
System.out.println("Consumer Started.%n");
}
}
ListSplitter
public class ListSplitter implements Iterator八、过滤消息> { private int sizeLimit = 1000 * 1000; //1M private final List
messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int nextIndex = currIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; //增加日志的开销20字节 if (tmpSize > sizeLimit) { if (nextIndex - currIndex == 0) { //单个消息超过了最大的限制(1M),否则会阻塞进程 nextIndex++; //加入下一个子列表没有元素,则添加这个子列表然后退出循环,否则退出循环 } break; } if (tmpSize + totalSize > sizeLimit) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } @Override public void remove() { throw new UnsupportedOperationException("Not allowed to remove"); } }
在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择您想要的消息,这时可以使用RocketMQ的消息过滤功能,具体实现是利用消息的Tag和Key。
Key一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。
Tag可以理解为是二级分类。以淘宝交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建OrderTopic 和PayTopic,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。
使用Key一般使用mqadmin管理工具,具体位置在RocketMQ/bin目录下。具体文档见:https://github.com/apache/rocketmq/blob/master/docs/cn/operation.md
使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。
使用案例Tag生产者发送60条消息,分别打上三种tag标签。
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//todo设定三种标签
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
Tag消费者
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TagFilterTest", "TagA || TagC");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + "topic:" + topic + ",tags:" + tags
+ ",a:" + msgPro + ",msg:" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
注意事项
Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。
8.2Sql过滤SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)
SQL基本语法数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
Sql过滤需要Broker开启这项功能(如果消费时使用SQL过滤抛出异常错误,说明Sql92功能没有开启),需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。
使用案例 消息生产者,发送消息时加入消息属性,你能通过putUserProperty来设置消息的属性,以下案例中生产者发送10条消息,除了设置Tag之外,另外设置属性a的值。
SQL生产者
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//设置SQL过滤的属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
SQL消费者
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + "topic:" + topic + ",tags:" + tags
+ ",a:" + msgPro + ",msg:" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
结果显示
public class ProducerDetails {
public static void main(String[] args) throws Exception {
//todo producerGroup:生产者所属组(针对 事务消息 高可用
DefaultMQProducer producer = new DefaultMQProducer("produce_details");
//todo 默认主题再每一个Broker队列数量(对于新创建主题有效)
producer.setDefaultTopicQueueNums(8);
//todo 发送消息默认超时时间,默认3s(3000ms)
producer.setSendMsgTimeout(3000);
//消息体超过该值则自动压缩,默认4k
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
//同步方式发送消息重试次数,默认2次,总共执行3次
producer.setRetryTimesWhenSendAsyncFailed(2);
//异步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendAsyncFailed(2);
//消息重试时选择另一个Broker时(消息没有存储成功是否发送到另一个broker),默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
;
//允许发送的最大消息长度,默认为4M
producer.setMaxMessageSize(1024 * 1024 * 4);
//设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
;
//启动Producer实例
producer.start();
//查找该主题下所有消息队列
List MessageQueue = producer.fetchPublishMessageQueues("TopicTest");
for (int i = 0; i < MessageQueue.size(); i++) {
System.out.println(MessageQueue.get(i).getQueueId());
}
for (int i = 0; i < 10; i++) {
final int index = i;
//创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//单项发送
//1.1发送单项消息
producer.sendOneway(msg);
//1.2指定队列单项消息发送(使用select方法)
producer.sendOneway(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, null);
//1.3指定队列单向发送消息(根据之前查出来的主题)
producer.sendOneway(msg, MessageQueue.get(0));
//同步发送
//2.1同步发送消息
SendResult sendResult0 = producer.send(msg);
//2.1同步超时发送消息
SendResult sendResult1 = producer.send(msg, 1000 * 3);
//2.2指定队列同步发送消息(使用select方法)
SendResult sendResult2 = producer.send(msg, new MessageQueueSelector() {
@Override
public org.apache.rocketmq.common.message.MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, null);
//2.3指定队列同步发送消息(根据之前查找出来的主题队列消息)
SendResult sendResult3 = producer.send(msg, MessageQueue.get(0));
//异步发送
//3.1异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
;
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %S %n", index, e);
e.printStackTrace();
}
});
//3.1异步超时发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
;
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %S %n", index, e);
e.printStackTrace();
}
}, 1000 * 3);
//3.3选择指定队列异步发送消息(使用select方法)
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
},
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
;
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %S %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
//如果不在发送消息,关闭Producer实例
producer.shutdown();
}
}
9.2 消息消费时重要属性和方法
public class ConsumerDetail {
public static void main(String[] args) throws Exception {
//属性
//consumerGroup;消费者组
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("king");
//指定Namesrv地址信息
consumer.setNamesrvAddr("127.0.0.1:9876");
//消息消费模式(默认集群消费)
consumer.setMessageModel(MessageModel.CLUSTERING);
//指定消费开始偏移量(上次消费偏移量,最大偏移量,最小偏移量,启动时间戳)开始消费(推荐CONSUME_FROM_LAST_OFFSET)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//消费者最小线程数量(默认20)
consumer.setConsumeThreadMin(20);
//消费者最大线程数量(默认20)
consumer.setConsumeThreadMax(20);
//推模式下任务间隔时间(推模式也是基于不断地轮训拉取的封装)
consumer.setPullInterval(0);
//推模式下任务拉取的条数,默认32条(一批批拉)
consumer.setPullBatchSize(32);
//消息重试次数,-1代表16次(超过 次数成为死信消息 如果想要演示死信消息可以设置为1或者2)
consumer.setMaxReconsumeTimes(-1);
//消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)
consumer.setConsumeTimeout(15);
//获取消费者对主题分配了哪些消息队列
Set MessageQueueSet= consumer.fetchSubscribeMessageQueues("Topictest");
Iterator iterator=MessageQueueSet.iterator();
while (iterator.hasNext()){
MessageQueue MessageQueue=(MessageQueue)iterator.next();
System.out.println(MessageQueue.getQueueId());
}
//方法-订阅
//基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest","*"); //tag tagA|tagB|tagC
//基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"));
//基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", MessageSelector.byTag("tagA|tagB"));
//取消消息订阅
consumer.unsubscribe("TopicTest");
//注册监听器
//注册并发事件监听器(顺序可能乱序)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags=msg.getTags();
System.out.println("收到消息:" + "topic:" + topic + ",tags:" + tags
+",msg:" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
//没有成功到重试队列中来
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//注册顺序消息时间监听器(为了确保消息的有序)
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random=new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
//可以看到每个queue有唯一的consume线程来消费,订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ",context:" + context.getMessageQueue());
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (InterruptedException e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会再处理这批消息,而不是放到重试队列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("Consumer Started.%n");
}
}
十、RocketMQ的高可用机制(一般场景采用同步复制与异步刷盘)
Master1会把消息复制到Slave1当中,而当Master1挂掉时,Consumer集群会从Slave1中获取到还未处理的已经复制的消息,但是Master1无法从Producer集群中获得消息,于是就形成了高可用机制
仔细查看broker-x.properties会发现
主节点的brokerRole都为 brokerRole=SYNC_MASTER(代表的为同步复制)
从节点的brokerRole都为 brokerRole=SLAVE
broker-a.properties中的brokerId=0,为broker-a主节点
broker-a-s.properties中的brokerId=1,为broker-a的从节点
broker-b.properties中的brokerId=0,为broker-b主节点
broker-b-s.properties中的brokerId=1,为broker-b的从节点
于是就形成了双主双从的同步模式,可以确保消息不丢失,但是消息发送的效率较低
在同步复制方式下,如果Master出故障,Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量
10.1.2 异步复制(等master写入成功就会返回成功消息)主节点的brokerRole都为 brokerRole=ASYNC_MASTER(代表的为异步复制)
从节点的brokerRole都为 brokerRole=SLAVE
于是就形成了双主双从的异步模式,比起同步模式效率更高,
&Emsp; 在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失
10.2 同步刷盘与异步刷盘 10.2.1 同步刷盘 在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态
当配置的为多master多slave的同步模式时候,只有当消息复制到slave并且消息已经同步刷到磁盘当中,才会给生产者进行消息确认(可靠性最高,但是效率最低)
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入
存在性能问题,数据要到磁盘,磁盘的速度比较慢,会造成生产者发送数据的一个缓慢性
10.2多master多slave的同步复制与异步刷盘采用多master多slave的同步复制与异步刷盘可以让备份数据到从节点当中进入进行数据的备份,提高消息发送的高并发,同时确保消息的不丢失
十一、RocketMQ存储设计 11.1 消费的并发度要解决消费并发,就是要利用Queue,一个Topic可以分出更多的queue,每一个queue可以存放在不同的硬件上来提高并发
11.2 热点问题(顺序消费,消息重复) 要确保消息的顺序,生产者、队列、消费者最好都是一对一的关系。但是这样设计,并发度就会成为消息系统的瓶颈(并发度不够)。RocketMQ不解决这个矛盾的问题,理由如下:
1、乱序的应用实际大量存在
2、队列无序并不意味着消息无序,另外还有消息重复,造成消息重复的根本原因是:网络不可达(网络波动)。所以如果消费者收到两条一样的消息,应该怎么处理?
RocketMQ不保证消息不重复,如果你的业务要严格确保消息不重复,需要在自己的业务端进行去重。
1、消费端处理消息的业务逻辑保持幂等性
2、确保每一条消息都有唯一的编号并且保证消息处理成功与去重的日志同时出现
RocketMQ因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储,所以RocketMQ采用文件进行存储
12.1 存储文件与消息的存储结构
RocketMQ的国企文件删除机制
RocketMQ是靠半消息机制实现分布式事务:
事务消息:MQ 提供类似 X/Open XA 的分布事务功能,通过 MQ 事务消息能达到分布式事务的最终一致。
半消息:暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此 时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
半消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
1.发送方向 MQ 服务端发送事务消息;
2.MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
3.发送方开始执行本地事务逻辑。
4.发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
5.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
15.1 分布式事务管理代码实现 分布式事务管理监听里Listenerpublic class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
//执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //设置日期格式
//执行本地事务 update A。。。
//System.out.println("update A ... where transactionId:"+msg.getTransactionId()+":"+df.format(new Date()));
//System.out.println("commit");
//情况1:本地事务成功
//return LocalTransactionState.COMMIT_MESSAGE
//情况2:本地事务失败
//return LocalTransactionState.ROLLBACK_MESSAGE
//情况3:业务复杂,还处于中间过程或者依赖其他操作的返回结果,就是unknow
System.out.println("业务比较长,还没有处理完,不知道成功还是失败!");
return LocalTransactionState.UNKNOW;
}
//事务回查 默认是60s 一分钟检查一次
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//打印每次回查的时间
SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //设置日期格式
System.out.println("checkLocalTransaction:"+df.format(new Date())); //new Date(为获取当前系统时间)
//情况3.1:业务回查成功
System.out.println("业务回查:执行本地事务成功,确认消息");
return LocalTransactionState.COMMIT_MESSAGE;
//情况3.2 业务回查回滚
//System.out.println("业务回查,执行本地事务失败,删除消息");
//return LocalTransactionState.ROLLBACK_MESSAGE;
//情况3.3 业务回查还是UNKNOW!
//System.out.println("业务比较长,还没有处理完,不知道是成功还是失败!");
//return LocalTransactionState.UNKNOW;
// Integer status = localTrans.get(msg.getTransactionId());
// if (null != status) {
// switch (status) {
// case 0:
// return LocalTransactionState.UNKNOW;
// case 1:
// return LocalTransactionState.COMMIT_MESSAGE;
// case 2:
// return LocalTransactionState.ROLLBACK_MESSAGE;
// default:
// return LocalTransactionState.COMMIT_MESSAGE;
// }
// }
// return LocalTransactionState.COMMIT_MESSAGE;
}
}
分布式事务管理消费者
public class TransactionConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("TranscationComsuer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TransactionTopic","*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
try {
//开启事务
for (MessageExt msg : msgs) {
//执行本地事务 update B...(幂等性)
System.out.println("update B... where transactionId:" + msg.getTransactionId());
//本地事务成功
System.out.println("commit:" + msg.getTransactionId());
System.out.println("执行本地事务成功,确认消息");
}
}catch (Exception e){
e.printStackTrace();
System.out.println("执行本地事务失败,重试消费,尽量确保B处理成功");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
}
}
分布式事务管理生产者
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
//创建线程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
//生产者设置监听器
producer.setTransactionListener(transactionListener);
//启动消费生产者
producer.start();
//1、半事务的发送
try {
Message msg =
new Message("TransacionTopic", null, ("A向B系统转100块钱").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //设置日期格式
System.out.printf(sendResult.getSendStatus() + "-" + df.format(new Date())); //半事务消息是否成功
} catch (MQClientException | UnsupportedEncodingException e) {
//如果失败回滚事务
e.printStackTrace();
}
//2、半事务的发送成功
//一些长时间等待的业务(比如输入密码,确认等操作):需要通过事务回查来处理
for (int i = 0; i < 1000; i++) {
Thread.sleep(1000);
}
for (
int i = 0;
i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
结果显示
事务成功,生产者发送消息,消费者接收消息
事务失败
生产者事务失败,消费者没有收到消息
当进行事务回查的时候,生产者发送消息,因为业务长所有未处理完,消费者没有接收到消息
事务回查成功,消息发送在14:57:56,但是消费者接收到消息再14:59:01
如果在事务回查的过程中发生了宕机,于是可以创建一个ListenerImpl2与TransactionProducer2,通过TransationProducer发送消息时发生了宕机行为,那么TransactionProducer2会继续回查该消息,完成事务的回查操作
生产者Producer
生产者Producer2
消费者



