1.前言
由于公司项目中目前正在淘汰RabbitMQ全面使用RocketMQ,所以抽空整理了一些Rocket的常用的用法,写成了一个Demo,RocketMQ相比RabbitMQ的优势,个人认为就是在堆积处理方面非常的优秀,以及在顺序处理消息方面的写法特别的简单。
深入了解可以在gitlab上阅读相关的中文文档
https://github.com/apache/rocketmq/tree/master/docs/cn
2.DEMO
本次的Demo主要是根据RocketMQ 4.8.0 ,rocket-spring-boot-stater 2.2.0进行的测各种消息模式测试,主要是普通,多TAG,顺序,事务,发送并相应生产消息模式
2.1常量定义
public class TestMQConstant {
//格式
public static final String FORMAT = "%s:%s";
//格式
public static final String SYMBOL = "||";
//topic
public static final String TOPIC = "testTopic";
//正常消息的 tag
public static final String TAG_NORMAL = "normal";
//正常消息的 消费者组tag
public static final String CONSUMER_NORMAL = "consumerNormal";
//顺序消息的 tag
public static final String TAG_ORDERLY = "orderly";
//顺序消息的 的消费者组
public static final String CONSUMER_ORDERLY = "consumerOrderly";
//消息者接收多个 tag
public static final String TAG_MULTI_FIRST = "multiFirst";
public static final String TAG_MULTI_SECOND = "multiSecond";
public static final String TAG_MULTI = TAG_MULTI_FIRST + SYMBOL + TAG_MULTI_SECOND;
//消息者接收多个的 的消费者组
public static final String CONSUMER_MULTI = "consumerMulti";
//发送并等待回复 tag
public static final String TAG_SEND_AND_RECEIVE = "sendAndReceive";
//发送并等待回复 的 消费者组
public static final String CONSUMER_SEND_AND_RECEIVE = "consumerSendAndReceive";
//事务 tag
public static final String TAG_TRANSACTION = "transaction";
//事务的 消费者组
public static final String CONSUMER_TRANSACTION = "consumerTransaction";
//顺序消息的share_key
public static final String SHARER_KEY_QUEUE_FIRST = "queueFirst";
//顺序消息的share_key
public static final String SHARER_KEY_QUEUE_SECOND = "queueSecond";
private TestMQConstant() {
}
}
2.2 消息实体类定义
public class TestMessage {
private String field1;
private String field2;
private Boolean flag;
private String shareKey;
public String getShareKey() {
return shareKey;
}
public void setShareKey(String shareKey) {
this.shareKey = shareKey;
}
public String getField1() {
return field1;
}
public void setField1(String field1) {
this.field1 = field1;
}
public String getField2() {
return field2;
}
public void setField2(String field2) {
this.field2 = field2;
}
public Boolean getFlag() {
return flag;
}
public void setFlag(Boolean flag) {
this.flag = flag;
}
@Override
public String toString() {
return "TestMessage{" +
"field1='" + field1 + ''' +
", field2='" + field2 + ''' +
", flag=" + flag +
", shareKey='" + shareKey + ''' +
'}';
}
}
public class TestReplyMessage {
private String message;
private int code;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
@Override
public String toString() {
return "TestReplyMessage{" +
"message='" + message + ''' +
", code=" + code +
'}';
}
}
2.3 配置文件配置
2.3.1生产者
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=test-producer-group
rocketmq.producer.access-key=
rocketmq.producer.secret-key=
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.5.7
com.example
TestRocketMQProducer
0.0.1-SNAPSHOT
TestRocketMQProducer
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.restdocs
spring-restdocs-mockmvc
test
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.0
org.asciidoctor
asciidoctor-maven-plugin
1.5.8
generate-docs
prepare-package
process-asciidoc
html
book
org.springframework.boot
spring-boot-maven-plugin
2.2.5.RELEASE
org.projectlombok
lombok
2.3.2消费者
# rocketmq
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.access-key=
rocketmq.producer.secret-key=
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.2.5.RELEASE
com.example
TestRocketMQConsumer
0.0.1-SNAPSHOT
TestRocketMQConsumer
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.restdocs
spring-restdocs-mockmvc
test
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.0
org.asciidoctor
asciidoctor-maven-plugin
1.5.8
generate-docs
prepare-package
process-asciidoc
html
book
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
3测试各类消费模式
3.1普通消费模式
3.1.1 生产者
@Resource
private RocketMQTemplate rocketMQTemplate;
@Test
void sendNormalMessage() {
List dataList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
TestMessage testQueue1TestMessage1 = new TestMessage();
testQueue1TestMessage1.setField1(i + "");
dataList.add(testQueue1TestMessage1);
}
String destination = formatDestinationByTag(TAG_NORMAL);
for (TestMessage testMessage : dataList) {
rocketMQTemplate.syncSend(destination, testMessage);
}
}
3.1.2 消费者
@Service
@RocketMQMessageListener(
topic = TestMQConstant.TOPIC,
consumerGroup = TestMQConstant.CONSUMER_NORMAL,
selectorexpression = TestMQConstant.TAG_NORMAL,
consumeMode = ConsumeMode.CONCURRENTLY)
@Slf4j
public class TesNormalConsumer implements RocketMQListener {
@Override
public void onMessage(TestMessage testMessage) {
log.info("进入了【普通消费者】:{}", testMessage);
}
}
3.2 有序的消费模式
3.2.1 生产者
@Test
void sendOrderlyMessage() throws InterruptedException {
List queue1 = new ArrayList<>();
List queue2 = new ArrayList<>();
for (int i = 0; i < 10; i++) {
//queue1
TestMessage testQueue1TestMessage1 = new TestMessage();
testQueue1TestMessage1.setField1(String.valueOf(i));
testQueue1TestMessage1.setShareKey(SHARER_KEY_QUEUE_FIRST);
queue1.add(testQueue1TestMessage1);
//queue2
TestMessage testQueue1TestMessage2 = new TestMessage();
testQueue1TestMessage2.setField2(String.valueOf(i));
testQueue1TestMessage2.setShareKey(SHARER_KEY_QUEUE_SECOND);
queue2.add(testQueue1TestMessage2);
}
String destination = formatDestinationByTag(TAG_ORDERLY);
Thread thread1 = new Thread(() -> {
for (TestMessage testMessage : queue1) {
rocketMQTemplate.syncSendOrderly(destination, testMessage, testMessage.getShareKey());
}
});
Thread thread2 = new Thread(() -> {
for (TestMessage testMessage : queue2) {
rocketMQTemplate.syncSendOrderly(destination, testMessage, testMessage.getShareKey());
}
});
//模拟多线程发送
thread1.start();
thread2.start();
thread1.join();
thread2.join();
}
3.2.2 消费者
@Service
@RocketMQMessageListener(
topic = TestMQConstant.TOPIC,
consumerGroup = TestMQConstant.CONSUMER_ORDERLY,
selectorexpression = TestMQConstant.TAG_ORDERLY,
consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class TestOrderlyConsumer implements RocketMQListener {
private static final Random random = new Random();
@SneakyThrows
@Override
public void onMessage(TestMessage testMessage) {
log.info("进入了【顺序消费者】:{}", testMessage);
String shareKey = testMessage.getShareKey();
if (TestMQConstant.SHARER_KEY_QUEUE_FIRST.equals(shareKey) || TestMQConstant.SHARER_KEY_QUEUE_SECOND.equals(shareKey)) {
int randomInt = random.nextInt(100) + 1000;
log.info("随机睡眠:{}", randomInt);
Thread.sleep(randomInt);
}
log.info("睡眠结束:{}", testMessage);
}
}
3.3 多个Tag消费者模式
3.3.1 生产者
@Test
void sendMultiTagToSameConsumerMessage() {
//tag1
TestMessage testQueue1TestMessage1 = new TestMessage();
testQueue1TestMessage1.setField1(TAG_MULTI_FIRST);
//tag2
TestMessage testQueue1TestMessage2 = new TestMessage();
testQueue1TestMessage2.setField1(TAG_MULTI_SECOND);
String destination1 = formatDestinationByTag(TAG_MULTI_FIRST);
String destination2 = formatDestinationByTag(TAG_MULTI_SECOND);
rocketMQTemplate.syncSend(destination1, testQueue1TestMessage1);
rocketMQTemplate.syncSend(destination2, testQueue1TestMessage2);
}
3.3.2 消费者
@Service
@RocketMQMessageListener(
topic = TestMQConstant.TOPIC,
consumerGroup = TestMQConstant.CONSUMER_MULTI,
selectorexpression = TestMQConstant.TAG_MULTI,
consumeMode = ConsumeMode.CONCURRENTLY)
@Slf4j
public class TesMultiNormalConsumer implements RocketMQListener {
@Override
public void onMessage(TestMessage testMessage) {
log.info("进入了【多个Tag的消费者】:tag{}", testMessage.getField1());
}
}
3.4 事务消费者模式
3.4.1生产者
@Test
void senTransactionMessage() throws InterruptedException {
TestMessage success = new TestMessage();
success.setField1(TAG_TRANSACTION);
success.setFlag(true);
senTransactionMessage(success);
TestMessage fail = new TestMessage();
fail.setField1(TAG_TRANSACTION);
fail.setFlag(false);
senTransactionMessage(fail);
TestMessage unknown = new TestMessage();
success.setField1(TAG_TRANSACTION);
success.setFlag(null);
senTransactionMessage(unknown);
Thread.sleep(30000);
}
private void senTransactionMessage(TestMessage success) {
String destination = formatDestinationByTag(TAG_TRANSACTION);
Message build = MessageBuilder.withPayload(success).build();
TransactionSendResult transactionSendResult = rocketMQTemplate.sendMessageInTransaction(destination, build, success.getFlag());
System.out.println(transactionSendResult.toString());
}
3.4.2 消费者
@Service
@RocketMQMessageListener(
topic = TestMQConstant.TOPIC,
consumerGroup = TestMQConstant.CONSUMER_TRANSACTION,
selectorexpression = TestMQConstant.TAG_TRANSACTION,
consumeMode = ConsumeMode.CONCURRENTLY)
@Slf4j
public class TesTransactionConsumer implements RocketMQListener {
@Override
public void onMessage(TestMessage message) {
//事务消息本身对消费者是无感的,主要是对生产者本地事务+消息的持久化的
log.info("进入了【事务消费者】:{}", message);
}
}
3.5 发送并相应模式
3.5.1 生产者
@Test
void sendAndReceiver() {
TestMessage testEntityTestMessage = new TestMessage();
testEntityTestMessage.setField1(TAG_SEND_AND_RECEIVE);
String destination = formatDestinationByTag(TAG_SEND_AND_RECEIVE);
TestReplyMessage testReplyMessage = rocketMQTemplate.sendAndReceive(destination, testEntityTestMessage, TestReplyMessage.class, 6000);
System.out.println(testReplyMessage);
}
3.5.2 消费者
@Service
@RocketMQMessageListener(
topic = TestMQConstant.TOPIC,
consumerGroup = TestMQConstant.CONSUMER_SEND_AND_RECEIVE,
selectorexpression = TestMQConstant.TAG_SEND_AND_RECEIVE,
consumeMode = ConsumeMode.CONCURRENTLY)
@Slf4j
public class TesSendAndReceiverConsumer implements RocketMQReplyListener {
@SneakyThrows
@Override
public TestReplyMessage onMessage(TestMessage message) {
log.info("进入了【发送并且响应的消费者】:{}", message);
log.info("正在消费...loading...");
//睡眠之后再响应
Thread.sleep(3000);
TestReplyMessage testReplyMessage = new TestReplyMessage();
testReplyMessage.setMessage("消费成功");
testReplyMessage.setCode(0);
return testReplyMessage;
}
}
4 完整Maven代码下载路径
https://download.csdn.net/download/xuexi_deng/54839349