下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip
> unzip rocketmq-all-4.9.3-source-release.zip > cd rocketmq-all-4.9.3/ > mvn -Prelease-all -DskipTests clean install -U > cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3启动NameServer
> nohup sh bin/mqnamesrv & > tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...启动Broker
> nohup sh bin/mqbroker -n localhost:9876 & > tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...测试消息的发送和消费
> export NAMESRV_ADDR=localhost:9876 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer SendResult [sendStatus=SEND_OK, msgId= ... > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...停止服务
如果需要停止服务,执行如下命令
> sh bin/mqshutdown broker The mqbroker(36695) is running... Send shutdown request to mqbroker(36695) OK > sh bin/mqshutdown namesrv The mqnamesrv(36664) is running... Send shutdown request to mqnamesrv(36664) OK部署RocketMQ Dashbord
为了方便管理,我们还需要Dashbord
下载地址:https://gitee.com/nswish/rocketmq-dashboard/repository/archive/master.zip
修改配置文件application.properties,增加配置rocketmq.config.namesrvAddr地址
# 省略... rocketmq.config.namesrvAddr=localhost:9876 # 省略...
构建并启动
# Maven spring-boot run mvn spring-boot:run # 或 Maven build and run mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
访问:http://localhost:8080。
至此,RocketMQ已经部署完成。
普通消息 消息发送org.apache.rocketmq rocketmq-client 4.9.3
消息发送的步骤:
- 创建消息生产者 producer,并指定生产者组名指定 Nameserver 地址启动 producer创建消息对象,指定 Topic、Tag 和消息体发送消息关闭生产者 producer
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。
代码示例:
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
异步发送
异步消息通常用在对响应时间敏感的业务场景。
示例代码:
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
//启用Broker故障延迟机制
producer.setSendLatencyFaultEnable(true);
for (int i = 0; i < 100; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
单向发送
这种方式主要用在不需要关心发送结果的场景,例如日志发送。
示例代码:
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer 对象。
DefaultMQProducer producer = new DefaultMQProducer("test_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
for (int i = 0; i < 20; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消息发送的权衡
| 发送方式 | 发送TPS | 发送结果反馈 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| 同步发送 | 快 | 有 | 可靠 | 适用广泛,如重要的消息通知,短信通知等。 |
| 异步发送 | 快 | 有 | 可靠 | 对响应时间敏感的应用场景 |
| 单向发送 | 最快 | 有 | 不 可靠 | 可靠性要求不高的场景,如日志采集 |
- 创建消费者 Consumer,指定消费者组名指定 Nameserver 地址订阅主题 Topic 和 Tag设置回调函数,处理消息启动消费者 consumer
一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。实际上,每个 Consumer 是平均分摊 Message Queue 的。例如,一个 Topic 有3个 Queue,其中一个Consumer Group 有3个实例,那么每个实例只消费其中一个Queue。
这种模式下,消费进度(Consumer Offset)的存储会持久化到 Broker。
代码示例,启动同一分组下的两个消费者
public class BalanceComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.setMaxReconsumeTimes(1);
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
广播消费
消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。实际上,是一个消费组下的每个消费者实例都获取到了 topic 下面的每个 Message Queue 去拉取消费。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。
代码示例,启动统一分组下的两个消费者
public class BroadcastComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
// 如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息消费的权衡
集群模式
- 消费端集群化部署,每条消息只需要被处理一次。由于消费进度在服务端维护,可靠性更高。集群消费模式下,每一条消息都只会被分发到一台机器上处理。
- 每条消息都需要被相同逻辑的多台机器处理。消费进度在客户端维护,出现重复的概率稍大于集群模式。不支持顺序消息、不支持重置消费位点。广播模式下, RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
在默认的情况下,消息发送会采取轮询方式把消息发送到不同的 queue;而消费消息的时候是从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。
但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
全局顺序消息:
分区顺序消息:
一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。
public class ProducerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订单列表
List orderList = new ProducerInOrder().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < orderList.size(); i++) {
// 加个时间前缀
String body = dateStr + " Order:" + orderList.get(i);
Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
@Getter
@Setter
@ToString
private static class Order {
private long orderId;
private String desc;
}
private List buildOrders() {
List orderList = new ArrayList();
Order orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406002L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406003L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new Order();
orderDemo.setOrderId(20210406001L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
}
运行结果:
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}
使用顺序消息时,首先要保证消息是有序进入 MQ 的,对 id 等关键字进行取模后,放入指定 messageQueue中,Consume 消费消息失败时, 不能返回 reconsume_later,这样会导致乱序,应该返回 suspend_current_queue_a_moment。
消费时,同一个 OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("PartOrder", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
运行结果:
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}
延时消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。
消息生产和消费有时间窗口要求;比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。
延迟消息是根据延迟队列的 level 来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟 10 秒;、“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java
生产者示例代码:
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
// delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(4);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown();
}
}
消费者示例代码:
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topics
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
}
}
批量消息
批量发送消息能显著提高传递消息的性能。限制是,这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。
生产者示例代码:
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
String topic = "BatchTest";
List messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费者示例代码:
public class BatchComuser {
public static void main(String[] args) throws Exception {
// 实例化消息生产者,指定组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("BatchTest", "*");
//负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
批量切分
如果消息的总长度可能大于 4MB 时,这时候需要把消息进行分割。
生产者示例代码:
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
//large batch
String topic = "BatchTest";
List messages = new ArrayList<>(100 * 1000);
//10万元素的数组
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//把大的消息分裂成若干个小的消息(1M左右)
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
System.out.printf("Consumer Started.%n");
}
}
class ListSplitter implements Iterator> {
private int sizeLimit = 1000 * 1000;//1M
private final List messages;
private int currIndex;
public ListSplitter(List messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map properties = message.getProperties();
for (Map.Entry entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > sizeLimit) {
//单个消息超过了最大的限制(1M)
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
totalSize += tmpSize;
}
}
List subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not allowed to remove");
}
}
过滤消息
Tag 过滤
在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。
生产者示例代码:
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message msg = new Message("TagFilterTest",
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者示例代码:
public class TagFilterConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是一个消息只能有一个标签。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。
Sql 过滤 SQL 基本语法RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较: 比如:>,>=,<,<=,BETWEEN,=;
字符比较: 比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号:AND,OR,NOT;
常量支持类型为: 数值,比如:123,3.1415; 字符,比如:‘abc’,必须用单引号包裹起来; NULL,特殊的常量;布尔值,TRUE 或 FALSE
生产者示例代码:
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message msg = new Message("SqlFilterTest",
tags[i % tags.length],
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}
消费者示例代码:
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("localhost:9876");
// Don't forget to set enablePropertyFilter=true in broker
consumer.subscribe("SqlFilterTest",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
"and (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
如果出现如下报错错误:说明 Sql92 功能没有开启
需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务。
如图,事务消息分为两个流程:
正常事务消息的发送和提交(1,2,3,4)事务消息的补偿流程(4,6,7) 正常事务流程
- 发送半事务消息。服务端响应半事务消息的发送结果。根据发送结果执行本地事务。如果写入失败,此时半消息对业务不可见,本地逻辑不执行。根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)。
补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
- 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”。Producer 收到回查消息,检查回查消息对应的本地事务的状态。根据本地事务状态,重新 Commit 或者 Rollback。
事务消息共有三种状态:
TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown:它代表需要检查消息队列来确定状态。
使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,通过设置自定义线程池来处理事务回查请求。
执行本地事务后、需要根据执行结果对消息队列进行回复。
生产者示例代码
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("hzy_produce");
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置生产者回查线程池
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。checkLocalTransaction 方法用于检查本地事务状态。
事务监听器示例代码
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
常用的属性和方法
生产者
public class ProducerDetails {
public static void main(String[] args) throws Exception{
// producerGroup:生产者所属组(针对 事务消息 高可用)
DefaultMQProducer producer = new DefaultMQProducer("produce_details");
// 默认主题在每一个Broker队列数量(对于新创建主题有效)
producer.setDefaultTopicQueueNums(8);
// 发送消息默认超时时间,默认3s (3000ms)
producer.setSendMsgTimeout(1000*3);
// 消息体超过该值则启用压缩,默认4k
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
// 同步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendFailed(2);
// 异步方式发送消息重试次数,默认为2,总共执行3次
producer.setRetryTimesWhenSendAsyncFailed(2);
// 消息重试时选择另外一个Broker时(消息没有存储成功是否发送到另外一个broker),默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
// 允许发送的最大消息长度,默认为4M
producer.setMaxMessageSize(1024 * 1024 * 4);
// 设置NameServer的地址
producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
// 启动Producer实例
producer.start();
// 0 查找该主题下所有消息队列
List MessageQueue = producer.fetchPublishMessageQueues("TopicTest");
for (int i = 0; i < MessageQueue.size(); i++) {
System.out.println(MessageQueue.get(i).getQueueId());
}
for (int i = 0; i < 10; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 单向发送
// 1.1发送单向消息
producer.sendOneway(msg);
// 1.2指定队列单向发送消息(使用select方法)
producer.sendOneway(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
// 1.3指定队列单向发送消息(根据之前查找出来的主题)
producer.sendOneway(msg,MessageQueue.get(0));
// 同步发送
// 2.1同步发送消息
SendResult sendResult0 = producer.send(msg);
// 2.1同步超时发送消息(属性设置:sendMsgTimeout 发送消息默认超时时间,默认3s (3000ms) )
SendResult sendResult1 = producer.send(msg,1000*3);
// 2.2指定队列同步发送消息(使用select方法)
SendResult sendResult2 = producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
},null);
// 2.3指定队列同步发送消息(根据之前查找出来的主题队列信息)
SendResult sendResult3 = producer.send(msg,MessageQueue.get(0));
// 异步发送
// 3.1异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
// 3.1异步超时发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
},1000*3);
// 3.2选择指定队列异步发送消息(根据之前查找出来的主题队列信息)
producer.send(msg,MessageQueue.get(0),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
// 3.3选择指定队列异步发送消息(使用select方法)
producer.send(msg,new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
return mqs.get(0);
}
},
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
消费者
public class ComuserDetails {
public static void main(String[] args) throws Exception {
// 属性
// consumerGroup:消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("king");
// 指定Namesrv地址信息.
consumer.setNamesrvAddr("106.55.246.66:9876");
// 消息消费模式(默认集群消费)
consumer.setMessageModel(MessageModel.CLUSTERING);
// 指定消费开始偏移量(上次消费偏移量、最大偏移量、最小偏移量、启动时间戳)开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 消费者最小线程数量(默认20)
consumer.setConsumeThreadMin(20);
// 消费者最大线程数量(默认20)
consumer.setConsumeThreadMax(20);
// 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
consumer.setPullInterval(0);
// 推模式下任务拉取的条数,默认32条(一批批拉)
consumer.setPullBatchSize(32);
// 消息重试次数,-1代表16次 (超过 次数成为死信消息)
consumer.setMaxReconsumeTimes(-1);
// 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)
consumer.setConsumeTimeout(15);
// 获取消费者对主题分配了那些消息队列
Set MessageQueueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
Iterator iterator = MessageQueueSet.iterator();
while(iterator.hasNext()){
MessageQueue MessageQueue =(MessageQueue)iterator.next();
System.out.println(MessageQueue.getQueueId());
}
// 方法-订阅
// 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC
// 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest",MessageSelector.bySql("a between 0 and 3"));
// 基于主题订阅消息,消息过滤使用表达式
consumer.subscribe("TopicTest",MessageSelector.byTag("tagA|TagB"));
// 取消消息订阅
consumer.unsubscribe("TopicTest");
// 注册监听器
// 注册并发事件监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
//没有成功 -- 到重试队列中来
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//
}
});
// 注册顺序消息事件监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
// 这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动消息者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}



