消息生产者
package com.rocketmqpro.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class RMQPro {
public static void main(String[] args) {
final boolean[] close = {false};
DefaultMQProducer producer = new DefaultMQProducer("Producer");
// 必须要设置nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
try {
producer.setClientIP("**");
//设置实例名称
producer.setInstanceName("rocketMQ");
//设置重试的次数
producer.setRetryTimesWhenSendFailed(3);
//开启生产者
producer.start();
//创建一条消息
Message msg = new Message("PushTopic", "PushTopic", "PushTopic_01", "内容一".getBytes());
//发送,并触发回调函数
producer.send(msg, new SendCallback() {
@Override
//成功的回调函数
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult.getSendStatus());
State.sta = true;
}
@Override
//出现异常的回调函数
public void onException(Throwable e) {
System.out.println("SEND_EXCEPTION:" + e.getMessage());
State.sta = true;
}
});
//获取某个主题的消息队列
//List messageQueues = producer.fetchPublishMessageQueues("PushTopic");
//System.out.println("消息队列数量:" + messageQueues.size());
//for (int i = 0; i < messageQueues.size(); i++) {
// System.out.println(messageQueues.get(i).toString());
//}
} catch (Exception e) {
e.printStackTrace();
} finally {
while (!State.sta) {
}
producer.shutdown();
System.out.println("producer shutdown......");
}
}
}
package com.rocketmqpro.producer;
public class State {
public volatile static boolean sta=false;
}
消息消费者(用的springboot)
server: port: 8080 server-header: rocketmq: name-server: 127.0.0.1:9876 #私有云消费mq地址
4.0.0 org.springframework.boot spring-boot-starter-parent2.4.10 com.example rocketmqpro0.0.1-SNAPSHOT rocketmqpro 1.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-weborg.apache.rocketmq rocketmq-spring-boot-starter2.1.0 org.apache.rocketmq rocketmq-client4.6.0 org.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-maven-plugin
package com.rocketmqpro;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RocketmqproApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqproApplication.class, args);
}
}
package com.rocketmqpro.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "PushTopic", consumerGroup = "jhc") public class RMQCon implements RocketMQListener广播消费 一条消息被多个 Consumer 消费,即使这些 Consumer 属于同一个 Consumer Group ,消息也会 被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。 集群消费 一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其 中一个 Consumer Group 有 3 个实例 ( 可能是 3 个进程,或者 3 台机器 ) ,那举每个实例只消费其中的 3条消息。 标签(Tag) 为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不 同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。 生产者发送消息发生返回状态(SendResult#SendStatus)有如下四种: FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE SEND_OK 不同状态在不同的刷盘策略和同步策略的配置下含义是不同的: 1. FLUSH_DISK_TIMEOUT: 表示没有在规定时间内完成刷盘(需要 Broker 的刷盘策略被设置成 SYNC_FLUSH 才会报这个错误)。 2. FLUSH_SLAVE_TIMEOUT: 表示在主备方式下,并且 Broker 被设置成 SYNC_MASTER 方式, 没有在设定时间内完成主从同步。 3. SLAVE_NOT_AVAILABLE: 这个状态产生的场景和 FLUSH_SLAVE_TIMEOUT 类似,表示在主 备方式下,并且 Broker 被设置成 SYNC_MASTER ,但是没有找到被配置成 Slave 的 Broker 。 4. SEND_OK: 表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是 否被同步到了 Slave 上?消息在 Slave 上是否被写入磁盘?需要结合所配置的刷盘策略、主从策 略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是 SEND_OK 。 在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。 消费上的优化 提高消费并行度 在 同一个 ConsumerGroup 下( Clustering 方式),可以通过 增加 Consumer 实例 的数量来提 高并行度。通过加机器,或者在已有机器中启动多个 Consumer 进程都可以增加 Consumer 实例数。 注意:总的 Consumer 数量不要超过 Topic 下 Read Queue 数量,超过的 Consumer 实例接收 不到消息。此外,通过提高单个 Consumer 实例中的 并行处理的线程数 ,可以在同一个 Consumer 内增加 并行度来提高吞吐量(设置方法是修改 consumeThreadMin 和 consumeThreadMax )。 以批量方式进行消费 某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中 涉及 update 某个数据库,一次 update10 条的时间会大大小于十次 update1 条数据的时间。 可以通过批量方式消费来提高消费的吞吐量。实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数,默认是 1 ,如果设置为 N ,在消息多的时候每次 收到的是个长度为 N 的 消息链表 。 检测延时情况,跳过非重要消息 Consumer 在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆 积,这个时候可以选择丢弃不重要的消息,使 Consumer 尽快追上 Producer 的进度。, RocketMQPushConsumerLifecycleListener { @Override public void onMessage(MessageExt message) { System.out.println(new String(message.getBody())); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { //从消息队列头部开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置广播消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); } }



