目录
前言
异步消息
1、引入依赖
2、异步消息与异步延迟消息生产者工具类
3、消息消费者
4、调用
前言
RocketMQ官网:Quick Start - Apache RocketMQ
下载安装:windows下RocketMQ下载安装教程 - darendu - 博客园
分布式事务消息:SpringCloud Alibaba微服务实战三十二 - 集成RocketMQ实现分布式事务_飘渺Jam的博客-CSDN博客
MQ选型:关于ActiveMQ、RocketMQ、RabbitMQ、Kafka一些总结和区别 - Alano的自嘲 - 博客园
异步消息
1、引入依赖
org.apache.rocketmq
rocketmq-spring-boot-starter
2、异步消息与异步延迟消息生产者工具类
package com.szzz.catering.order.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendAsyncMsg (String topic, String tag, T msg){
rocketMQTemplate.asyncSend(topic + ":" + tag,
msg
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功的Topic={},tag={},内容={}", topic ,tag, msg);
}
@Override
public void onException(Throwable throwable) {
log.info("发送失败的Topic={},tag={},内容={}", topic ,tag, msg);
}
});
}
public void sendAsyncDelayMsg(String topic, String tag, T msg, int delayLevel){
Message message= MessageBuilder.withPayload(msg).build();
log.info("发送的Topic={},tag={},内容={}", topic ,tag, msg);
rocketMQTemplate.asyncSend(topic + ":" + tag, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送延时消息成功的Topic={},tag={},内容={}", topic ,tag, msg);
}
@Override
public void onException(Throwable throwable) {
log.info("发送延时消息失败的Topic={},tag={},内容={}", topic ,tag, msg);
}
},3000L,delayLevel);
}
public final static int THIRTY_MINUTES_DELAY_LEVEL=16;
}
3、消息消费者
package com.szzz.catering.order.listener.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "order-topic",
selectorexpression= "order-print-tag",
consumerGroup = "order_print_group")
@Slf4j
public class PrintListener implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("消费了一条消息>>>>>>>>>>>>>>>>>:{}",message);
}
}
4、调用
rocketMQProducer.sendAsyncMsg(mqProperties.getTopic(),mqProperties.getTag(),msg);
此时配置文件yaml需配置mqProperties所需要的TOPIC和TAG
org.apache.rocketmq
rocketmq-spring-boot-starter
2、异步消息与异步延迟消息生产者工具类
package com.szzz.catering.order.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendAsyncMsg (String topic, String tag, T msg){
rocketMQTemplate.asyncSend(topic + ":" + tag,
msg
, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功的Topic={},tag={},内容={}", topic ,tag, msg);
}
@Override
public void onException(Throwable throwable) {
log.info("发送失败的Topic={},tag={},内容={}", topic ,tag, msg);
}
});
}
public void sendAsyncDelayMsg(String topic, String tag, T msg, int delayLevel){
Message message= MessageBuilder.withPayload(msg).build();
log.info("发送的Topic={},tag={},内容={}", topic ,tag, msg);
rocketMQTemplate.asyncSend(topic + ":" + tag, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送延时消息成功的Topic={},tag={},内容={}", topic ,tag, msg);
}
@Override
public void onException(Throwable throwable) {
log.info("发送延时消息失败的Topic={},tag={},内容={}", topic ,tag, msg);
}
},3000L,delayLevel);
}
public final static int THIRTY_MINUTES_DELAY_LEVEL=16;
}
3、消息消费者
package com.szzz.catering.order.listener.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "order-topic",
selectorexpression= "order-print-tag",
consumerGroup = "order_print_group")
@Slf4j
public class PrintListener implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("消费了一条消息>>>>>>>>>>>>>>>>>:{}",message);
}
}
4、调用
rocketMQProducer.sendAsyncMsg(mqProperties.getTopic(),mqProperties.getTag(),msg);
此时配置文件yaml需配置mqProperties所需要的TOPIC和TAG
package com.szzz.catering.order.listener.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Component
@RocketMQMessageListener(
topic = "order-topic",
selectorexpression= "order-print-tag",
consumerGroup = "order_print_group")
@Slf4j
public class PrintListener implements RocketMQListener {
@Override
public void onMessage(String message) {
log.info("消费了一条消息>>>>>>>>>>>>>>>>>:{}",message);
}
}
4、调用
rocketMQProducer.sendAsyncMsg(mqProperties.getTopic(),mqProperties.getTag(),msg);
此时配置文件yaml需配置mqProperties所需要的TOPIC和TAG
此时配置文件yaml需配置mqProperties所需要的TOPIC和TAG



