本文章的目的不在于讲解RocketMQ相关的基础知识点,此文章主要提供源码,帮助那些想要快速搭建一个入门级RocketMQ的初学者的同学。相关源码可参考如下:
项目结构: 建立父工程,pom文件导入依赖:模块(demo-api):定义相关的pojo类和接口类 1)首先引入pom依赖(demo-api):4.0.0 org.cainiao RocketDemopom 1.0-SNAPSHOT demo-api demo-producer demo-Consumer demo-Consumer-Slave org.springframework.boot spring-boot-starter-parent2.6.3 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest
2)模块(demo-api)项目结构: 3)ISender简单接口定义:RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-apiorg.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest
package com.cainiao.message;
import com.cainiao.pojo.Student;
public interface ISender {
public String demoSender(Student student);
public void templateSender(Student student);
}
4)Pojo类相关定义:
package com.cainiao.pojo;
import java.io.Serializable;
public class Student implements Serializable {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
模块(demo-producer):消息生产者
1)首先引入pom依赖(demo-producer):
2)模块(demo-producer)项目结构: 3)生产者启动类:RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-producerorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoProducer {
public static void main(String[] args) {
SpringApplication.run(DemoProducer.class, args);
}
}
4)发送消息接口实现类:(分别采用默认的DefaultMQProducer和采用RocketMQTemplate进行发送消息,两种方式)
package com.cainiao.service;
import com.alibaba.fastjson.JSON;
import com.cainiao.message.ISender;
import com.cainiao.pojo.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class IMessageSender implements ISender {
@Value("${mq.rocket.topic}")
private String topic;
@Value("${mq.rocket.tag}")
private String tag;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public String demoSender(Student student) {
try{
DefaultMQProducer producer = new DefaultMQProducer("demo_producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("chenke_topic", "message", String.valueOf(System.currentTimeMillis()),
JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
log.info("消息发送成功: " + message);
producer.shutdown();
}catch (Exception e){
log.error("消息发送失败");
}
return "消息发送成功";
}
@Override
public void templateSender(Student student) {
try{
Message message = new Message(topic, "tag4", JSON.toJSonString(student).getBytes(RemotingHelper.DEFAULT_CHARSET));
log.info("开始发送消息时间:" + System.currentTimeMillis());
rocketMQTemplate.getProducer().send(message);
log.info("消息发送成功时间:" + System.currentTimeMillis());
}catch (Exception e){
log.error("消息发送失败时间:" + System.currentTimeMillis());
}
}
}
5)配置文件application.properties
# RocketMQ配置 rocketmq.nameServer=127.0.0.1:9876 rocketmq.producer.group=demo_producer # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic mq.rocket.tag=message6)采用Junit5编写测试类进行功能测试:
package com.cainiao.service;
import com.cainiao.DemoProducer;
import com.cainiao.message.ISender;
import com.cainiao.pojo.Student;
import io.netty.handler.codec.compression.ZstdEncoder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest(classes = DemoProducer.class)
class IMessageSenderTest {
@Autowired
private ISender sender;
@Test
public void testIMessageSender(){
Student student = new Student();
student.setName("chenke");
student.setAge(25);
String res = sender.demoSender(student);
}
@Test
public void testTemplateSender() {
Student student = new Student();
student.setName("test2");
student.setAge(30);
sender.templateSender(student);
}
}
模块(demo-consumer):Master 主消息消费者
1)首先引入pom依赖(demo-consumer):
2)整体模块结构: 3)Master消费者启动类(此处)RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-Consumerorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoConsumer {
public static void main(String[] args) {
SpringApplication.run(DemoConsumer.class, args);
}
}
4)指定消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@RocketMQMessageListener(topic = "${mq.rocket.topic}",
consumerGroup = "${rocketmq.consumer.group}",
messageModel = MessageModel.BROADCASTING,
selectorexpression = "tag2 || tag4",
selectorType = SelectorType.TAG)
@Component
public class IMessageConsumer implements RocketMQListener {
@Override
public void onMessage(MessageExt messageExt) {
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
byte[] bodyB = messageExt.getBody();
String body = new String(bodyB);
log.info("Master已消费: " + body);
}
}
5)配置文件设置application.properties
# RocketMQ配置 (Master) rocketmq.nameServer=127.0.0.1:9876 rocketmq.consumer.group=consumer_group # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic # mq.rocket.tag=message1模块(demo-consumer):Slave 从消息消费者 1)首先引入pom依赖(demo-consumer-slave):
2)整体模块结构: 3)Slave 消费者启动类(此处)RocketDemo org.cainiao 1.0-SNAPSHOT 4.0.0 demo-Consumer-Slaveorg.cainiao demo-api1.0-SNAPSHOT org.apache.rocketmq rocketmq-spring-boot-starter2.2.1 org.springframework.boot spring-boot-starter-actuatororg.projectlombok lombokprovided
package com.cainiao.slave.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DemoConsumerSlave {
public static void main(String[] args) {
SpringApplication.run(DemoConsumerSlave.class, args);
}
}
4)指定Slave 从消费者对某一个Topic进行监听,并设置其消费者组:
package com.cainiao.slave.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.springframework.stereotype.Component;
import org.apache.rocketmq.spring.core.RocketMQListener;
@Slf4j
@RocketMQMessageListener(topic = "${mq.rocket.topic}",
consumerGroup = "${rocketmq.consumer.group}",
messageModel = MessageModel.BROADCASTING,
selectorexpression = "tag1 || tag3",
selectorType = SelectorType.TAG)
@Component
public class SlaverConsumer implements RocketMQListener{
@Override
public void onMessage(MessageExt messageExt) {
String msgId = messageExt.getMsgId();
String keys = messageExt.getKeys();
String tags = messageExt.getTags();
byte[] bodyB = messageExt.getBody();
String body = new String(bodyB);
log.info("Slave已消费: " + body);
}
}
5)配置文件设置application.properties
# RocketMQ配置 (Slave) rocketmq.nameServer=127.0.0.1:9876 rocketmq.consumer.group=consumer_group # Rocket 相关主题和标签配置 mq.rocket.topic=chenke_topic # mq.rocket.tag=message2



