部署Rocket首先要部署一个 NameServer
相关介绍参考: https://www.jianshu.com/p/3d8d594d9161
rocket 部署: docker部署
docker-compose
version: "3.7"
# https://github.com/apache/rocketmq-docker
services:
namesrv:
image: apacherocketmq/rocketmq:4.5.0
container_name: namesrv
ports:
- 9876:9876
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
mqbroker:
image: apacherocketmq/rocketmq:4.5.0
container_name: mqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./data/broker/conf/broker.conf:/home/rocketmq/rocketmq-4.5.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
broker.conf
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH #set `brokerIP1` if you want to set physical IP as broker IP. #change you own physical IP Address brokerIP1 = localhost # listenPort = 28081 namesrvAddr= localhost:9876 autoCreateTopicEnable=true
然后就是项目文件
消息生产者架构:
pom.xml 文件(生产者和消费者用同一个pom文件)
4.0.0 org.springframework.boot spring-boot-starter-parent 2.3.2.RELEASE com.demo.springcloud_06_rocketmq mq_producer 0.0.1-SNAPSHOT mq_producer 生产者 Hoxton.SR9 2.3.2.RELEASE 2.2.6.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.projectlombok lombok 1.18.0 true org.springframework.boot spring-boot-starter-actuator com.alibaba.cloud spring-cloud-starter-stream-rocketmq org.springframework.boot spring-boot-dependencies ${spring.boot.version} pom import org.springframework.cloud spring-cloud-dependencies ${spring.cloud.version} pom import com.alibaba.cloud spring-cloud-alibaba-dependencies ${spring.cloud.alibaba.version} pom import org.springframework.boot spring-boot-maven-plugin
生产者配置文件
application.yml
server:
port: 8062
spring:
cloud:
stream:
bindings:
topic_channel:
destination: TOPIC_CHANNEL_0
content-type: application/json
# group: topic_group
rocketmq:
binder:
name-server: localhost:9876 #,localhost:9877
bindings:
topic_channel:
producer:
group: topic_group # 生产者分组
# sync: true # 是否同步发送消息,默认为 false 异步。
MqProducerApplication,java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.demo.springcloud_06_rocketmq.mq_producer.producer.ProducerChannel;
@SpringBootApplication
@EnableBinding(ProducerChannel.class)
public class MqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MqProducerApplication.class, args);
}
}
ProducerController.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class ProducerController {
@Autowired
private MessageSender sender;
@GetMapping("/send")
public void testKafkaMessageSend(String message) {
log.info("message:{}",message);
sender.sendToTopicChannel(message);
}
}
ProducerChannel.java
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface ProducerChannel {
String TOPIC_CHANNEL = "topic_channel";
@Output(TOPIC_CHANNEL)
MessageChannel sendTopicChannelMessage();
}
MessageSender.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MessageSender {
@Autowired
private ProducerChannel channel;
public void sendToTopicChannel(String message) {
channel.sendTopicChannelMessage().send(MessageBuilder.withPayload(message).build());
}
}
消息消费者项目架构:
pom.xml文件 同上
application.yml 文件如下
server:
port: 8063
spring:
application:
name: mq_consumer
cloud:
stream:
bindings:
topic_channel:
destination: TOPIC_CHANNEL_0
content-type: application/json
group: topic_group_1
rocketmq:
binder:
name-server: localhost:9876
bindings:
topic_channel:
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
MqConsumerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import com.demo.springcloud_06_rocketmq.mq_consumer.consumer.ConsumerChannel;
@SpringBootApplication
@EnableBinding(ConsumerChannel.class)
public class MqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(MqConsumerApplication.class, args);
}
}
ConsumerListener.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class ConsumerListener {
@StreamListener(ConsumerChannel.TOPIC_CHANNEL)
public void receive(@Payload Message message){
log.info("{}订阅缺省消息:通道 = {},data = {}", new Date(),"topic_channel", message);
log.info("接收数据: {}",message.getPayload());
}
}
ConsumerChannel.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ConsumerChannel {
String TOPIC_CHANNEL = "topic_channel";
@Input(TOPIC_CHANNEL)
SubscribableChannel demo01Input();
}
mq的坑点主要在mq服务器的搭建上,如果配置不正确,消息服务器启动成功,但是接收和发送还会出问题,常见的就是发送成功,但是无法接收



