普通消费者-生产者模式org.springframework.boot spring-boot-dependencies2.6.4 pom import org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-testorg.apache.rocketmq rocketmq-spring-boot-starter2.2.2 org.projectlombok lombok
| 生产者同步发送 | 生产者异步发送 | 生产者单向发送 |
| 要等到有返回消息才能继续发送 | 不必等到返回消息,会另开一条线程等返回消息 | 不会接受返回消息 |
- 同步发送消息
1、生产者
@Component
public class ProducerSimple {
@Autowired
RocketMQTemplate rocketMQTemplate;
public void sendSyncMsg(String topic,String message){
rocketMQTemplate.syncSend(topic,message);
}
}
1.1测试生产者
@SpringBootTest
class ProducerSimpleTest {
@Autowired
ProducerSimple producerSimple;
@Test
public void testSendSyncMsg(){
producerSimple.sendSyncMsg("my-topic","第一条测试测试");
}
}
2、消费者
@Component @RocketMQMessageListener(topic = "my-topic",consumerGroup = "consumer-group") public class ConsumerSimple implements RocketMQListener{ public void onMessage(String s) { //接受到消息,业务逻辑 System.out.println("消费者监听消息:"+s); } }
2.1测试
运行spring入口文件即可
- 异步发送消息
1、生产者
public void sendAsyncMsg(String topic,String message){
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
//发送成功
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:"+sendResult);
}
//发送失败
public void onException(Throwable throwable) {
System.out.println("发送失败:"+throwable);
}
});
}
1.1生产者测试
@Test
public void testSendAsyncMsg(){
producerSimple.sendAsyncMsg("my-topic","第一条测试测试");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2、消费者
同上消费者
- 单向发送
生产者
public void sendOneMsg(String topic,String msg){
this.rocketMQTemplate.sendOneWay(topic,msg);
}
- 自定义消息体
1、实体类
@Data
@NoArgsConstructor
@ToString
public class OrderExt implements Serializable {
private String id;
private Date createTime;
private Long money;
private String title;
}
2、生产者
public void sendMyByJson(String topic, OrderExt orderExt){
//同步方法
rocketMQTemplate.convertAndSend(topic,orderExt);
}
3、消费者
@Component @RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "consumer-group-obj") public class ConsumerSimpleObj implements RocketMQListener延迟发送消息{ public void onMessage(OrderExt orderExt) { System.out.println(orderExt); } }
延迟发送时间有18个等级
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h
生产者
同步发送延迟消息:message是spring的
异步发送延迟消息:message是rocketmq的
public void sendDelayMsg(String topic,OrderExt orderExt){
//构建消息体
Message message = MessageBuilder.withPayload(orderExt).build();
rocketMQTemplate.syncSend(topic,message,10000,3);
System.out.println("send message:"+orderExt);
}
@SneakyThrows
public void sendAsyncDelayMsg(String topic, OrderExt orderExt){
String orderStr = JSON.toJSONString(orderExt);
//构建消息体
org.apache.rocketmq.common.message.Message message = new org.apache.rocketmq.common.message.Message(topic, orderStr.getBytes("UTF-8"));
this.rocketMQTemplate.getProducer().send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println("成功:"+sendResult);
}
public void onException(Throwable throwable) {
System.out.println("失败:"+throwable);
}
},10000);
System.out.println("send message:"+message);
}
消息重试
消息重试也是遵循18个等级去重试
MessageExt是来自import org.apache.rocketmq.common.message.MessageExt;
@Component @RocketMQMessageListener(topic = "my-topic-obj",consumerGroup = "consumer-group-obj") public class ConsumerTry implements RocketMQListener其他:{ public void onMessage(MessageExt messageExt) { //获取重试次数 int times = messageExt.getReconsumeTimes(); if (times>3){ //写入数据库的操作 return; } throw new RuntimeException(String.format("尝试第%d次失败",times)); } }
消费者
集群模式:
生产者发送消息到queue只能被1个消费者消费
广播模式:
生产者发送消费到queue能被多个消费者消费



