RocketMQ最新版本:4.5.1
下载地址
JDK1.8以上
Linux64位系统(CentOS Linux release 7.7.1908)
源码安装需要安装Maven 3.2.x
4G+ free (如果给不到虚拟机4G可用内存 可以修改rocketma配置 下面介绍)
-
下载rocketmq
#下载 wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
-
修改脚本
bin/runserver.sh bin/runbroker.sh bin/tools.sh
-
vim runserver.sh
-
vim runbroker.sh
-
vim tools.sh
-
启动NameServer
# 1.启动NameServer mqnamesrv # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动Broker
# 1.启动Broker mqbroker -n localhost:9876 # 2.查看启动日志 tail -f ~/logs/rocketmqlogs/broker.log
# 1.设置环境变量 export NAMESRV_ADDR=localhost:9876 # 2.使用安装包的Demo发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer2、接收消息
# 1.设置环境变量 export NAMESRV_ADDR=localhost:9876 # 2.接收消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer3、关闭RocketMQ
# 1.关闭NameServer mqshutdown namesrv # 2.关闭Broker mqshutdown broker三、RocketMQ和SpringBoot的整合 1、消息生产者
- 添加依赖
4.0.0 com.ckw.rocket springboot-rocketmq-produer 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.0.1.RELEASE 2.0.3 org.apache.rocketmq rocketmq-spring-boot-starter ${rocketmq-spring-boot-starter-version} org.projectlombok lombok 1.18.6 org.springframework.boot spring-boot-starter-test test
-
配置文件
# rocketmq的nameserver地址 rocketmq.name-server=IP:9876 # 指定生产组名称 rocketmq.producer.group=producer_grp
-
启动类
@SpringBootApplication
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}
- 测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MyRocketProducerApplication.class})
public class MyRocketProducerApplicationTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testSendMessage() {
// 用于向broker发送消息
// 第一个参数是topic名称
// 第二个参数是消息内容
this.rocketMQTemplate.convertAndSend(
"tp_springboot",
"springboot: hello ckw"
);
}
@Test
public void testSendMessages() {
for (int i = 0; i < 100; i++) {
// 用于向broker发送消息
// 第一个参数是topic名称
// 第二个参数是消息内容
this.rocketMQTemplate.convertAndSend(
"tp_springboot",
"springboot: hello ckw" + i
);
}
}
}
2、消息消费者
-
添加依赖
同消息生产者 -
配置文件
同消息生产者 -
启动类
同消息生产者 -
消息监听器
@Slf4j @Component @RocketMQMessageListener(topic = "tp_springboot", consumerGroup = "consumer_grp") public class MyRocketListener implements RocketMQListener{ @Override public void onMessage(String message) { // 处理broker推送过来的消息 log.info(message); } }
注意: 可能会启动报错,org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.1:10911> failed
新增配置 vim conf/broker.conf



