在上篇中,了解RocketMQ的基本概念,以及安装好RocketMQ,启动后。进行代码实际操作练习
本篇主要练习三种发送方式
一、基础项目创建需要创建两个项目,来表示消费者和生产者
创建两个SpringBoot项目,并导入maven和配置好application.yaml
maven需要导入rocketmq-common、rocketmq-client、fastjson
org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest org.apache.rocketmq rocketmq-common4.3.0 org.apache.rocketmq rocketmq-client4.3.0 com.alibaba fastjson1.2.49
application.yaml
rocketmq:
consumer:
# RocketMQ
namesrvAddr: your`s ip:9876
groupName: test-demo
instanceName: consumer.demo
producer:
sendMsgTimeout: 10000
maxMessageSize: 999999999
compressOver: 40000
topic: test-demo
# tag , "0||1||2"
tag: "111"
RocektMQConsumer项目中,添加配置类,用于创建Consumer,并启动该项目
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.instanceName}")
private String instanceName;
@Value("${rocketmq.producer.sendMsgTimeout}")
private int sendMsgTimeout;
@Value("${rocketmq.producer.maxMessageSize}")
private int maxMessageSize;
@Value("${rocketmq.producer.compressOver}")
private int compressOver;
@Value("${rocketmq.topic}")
private String topic;
@Value("${rocketmq.tag}")
private String tag;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.groupName);
consumer.setNamesrvAddr(this.namesrvAddr);
consumer.setInstanceName(this.instanceName);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
try {
consumer.subscribe(this.topic, tag);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, consumeConcurrentlyContext) -> {
try {
MessageExt msg = null;
for (MessageExt aMsgList : msgList) {
msg = aMsgList;
System.out.println("收到MQ消息:"+new String(msg.getBody()));
}
} catch (JSonException e) {
System.out.println(e);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("已启动Conusmer【gruop:" + this.groupName + ",instance:" + this.instanceName
+ "】,监听TOPIC-{" + this.topic + "},tag-{" + this.tag + "}");
} catch (MQClientException e) {
System.out.println(e);
}
return consumer;
}
}
producter项目中,同样创建配置类,用于创建Producter
@Configuration
public class SpringConfig {
@Value("${rocketmq.consumer.groupName}")
private String groupName;
@Value("${rocketmq.consumer.namesrvAddr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.instanceName}")
private String instanceName;
@Value("${rocketmq.producer.sendMsgTimeout}")
private int sendMsgTimeout;
@Value("${rocketmq.producer.maxMessageSize}")
private int maxMessageSize;
@Value("${rocketmq.producer.compressOver}")
private int compressOver;
@Value("${rocketmq.topic}")
private String topic;
@Value("${rocketmq.tag}")
private String tag;
@Bean
public DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer(this.groupName);
producer.setNamesrvAddr(this.namesrvAddr);
producer.setInstanceName(instanceName);
producer.setSendMsgTimeout(this.sendMsgTimeout);
producer.setCompressMsgBodyOverHowmuch(this.compressOver);
producer.setMaxMessageSize(this.maxMessageSize);
try {
System.out.println("Producer start...");
producer.start();
} catch (Exception e) {
System.out.println(e);
}
return producer;
}
二、测试
生产者发送消息的三种方式
| 发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
| 同步发送 | 快 | 有 | 不丢失 |
| 异步发送 | 快 | 有 | 不丢失 |
| 单向发送 | 最快 | 无 | 可能丢失 |
将在生产者项目中添加test进行测试
在test目录下对应位置创建或修改已有测试类,添加自动注入生产者实例
@Autowired DefaultMQProducer producer;1.同步发送
* 可靠同步发送 * 含义: * 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式 * 方式: * 调用DefaultMQProducer的send方法 * 应用场景: * 重要消息通知、短信通知等
@Test
void ReliableSynchronousT() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
for (int i = 0; i < 10; i++)
{
final int index = i;
Message msg = new Message("test-demo" ,
"111" ,
("hello rocketMq" + index).getBytes(StandardCharsets.UTF_8)
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult.getSendStatus() + " " + i);
}
}
启动测试
生产者窗口,发现消息是一条一条接着发送
在消费者窗口查看
2.异步发送
@Test
void AsyncT() throws RemotingException, InterruptedException, MQClientException {
producer.setRetryTimesWhenSendAsyncFailed(0);
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 10; i++)
{
final int index = i;
Message msg = new Message("test-demo" ,
"111" ,
("hello rocketMq by Async" + index).getBytes(StandardCharsets.UTF_8)
);
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("发送:" + index);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("发送:" + index + "失败 " + throwable);
}
});
//添加线程延迟,否则发送失败
countDownLatch.await(1, TimeUnit.SECONDS);
}
}
生产者界面
消费者
3.单向发送 @Test
void oneWayT() throws RemotingException, InterruptedException, MQClientException {
for (int i = 0; i < 10; i++)
{
final int index = i;
Message msg = new Message("test-demo" ,
"111" ,
("hello rocketMq by OneWay" + index).getBytes(StandardCharsets.UTF_8)
);
producer.sendoneway(msg);
}
}
生产者无输出
消费者



