链接地址:centos8安装rocketMQ_weixin_44919041的博客-CSDN博客
2.pom文件依赖
2.application.yml配置4.0.0 org.springframework.boot spring-boot-starter-parent2.6.4 com.company spring-rocktmq0.0.1-SNAPSHOT spring-rocktMQ spring-rocktMQ 11 org.springframework.boot spring-boot-starter-weborg.projectlombok lomboktrue org.apache.rocketmq rocketmq-spring-boot-starter2.2.0 org.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-maven-pluginorg.projectlombok lombok
server:
port: 8888
rocketmq:
name-server: 121.4.253.22:9876 # RocketMQ地址
producer:
send-message-timeout: 30000
group: my-group
consumer:
consumerGroup: string_consumer
stringTopic: string-topic
userTopic: user-topic
tag: tagA
3.java代码实现
1.发送实现类
package com.company.rocktmq.service;
import com.company.rocktmq.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Producerservice {
private final RocketMQTemplate mqTemplate;
@Value(value = "${rocketmq.consumer.stringTopic}")
private String springTopic;
@Value(value = "${rocketmq.consumer.userTopic}")
private String userTopic;
@Value(value = "${rocketmq.consumer.tag}")
private String tag;
public Producerservice(RocketMQTemplate mqTemplate) {
this.mqTemplate = mqTemplate;
}
public SendResult sendString(String message) {
// 调用 RocketMQTemplate 的 syncSend 方法
SendResult sendResult = mqTemplate.syncSend(springTopic + ":" + tag, message);
log.info("springTopic="+springTopic+"string-sendResult=" +sendResult);
return sendResult;
}
public SendResult sendUser(User user) {
SendResult sendResult = mqTemplate.syncSend(userTopic+":" + tag, user);
log.info("userTopic="+userTopic+",用户sendResult="+sendResult);
return sendResult;
}
}
2.消费者监听类
1.string消费者监听类
package com.company.rocktmq.service; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(consumerGroup = "string-group", topic = "string-topic",selectorexpression = "tagA") public class RocketString implements RocketMQListener{ @Override public void onMessage(String message) { System.err.println("接收到string消息:" + message); } }
2.user消费者监听类
package com.company.rocktmq.service; import com.company.rocktmq.entity.User; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Service @RocketMQMessageListener(consumerGroup = "user_group", topic = "user-topic",selectorexpression = "tagA") public class Rocketuser implements RocketMQListener3.其他类{ @Override public void onMessage(User message) { System.err.println("接收到用户消息:" + message); } }
1.user类
package com.company.rocktmq.entity;
import lombok.Data;
@Data
public class User {
private String userName;
private int age;
}
2.controller类
package com.company.rocktmq.controller;
import com.company.rocktmq.entity.User;
import com.company.rocktmq.service.Producerservice;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource
private RocketMQTemplate rocketMQTemplate;
private final Producerservice producerService;
public ProducerController(Producerservice producerService) {
this.producerService = producerService;
}
@PostMapping("/string")
public SendResult sendString(@RequestBody String message){
return producerService.sendString(message);
}
@PostMapping("/user")
public SendResult sendUser(@RequestBody User user){
return producerService.sendUser(user);
}
}
4.请求接口
请求结果如下:
消费端监听打印结果如下:
rockeMQ显示如下:
消息如下:
消费者信息详情:



