参考:https://www.cnblogs.com/myseries/p/13153797.html
参考:https://www.cnblogs.com/qdhxhz/p/11109696.html
首先创建一个springboot项目,引入rocketmq的依赖:
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
我们可以定义一个消息体类,用来封装消息。
@Data
public class MessageBody {
// 消息id
private String messageId;
// body组装时间
private long timestamp;
// 来源 附加信息
private String msgSource;
// overload
private Object data;
public MessageBody() {
}
public MessageBody(String msgKey, Object data, String msgSource) {
this.messageId = msgKey;
this.data = data ;
this.msgSource = msgSource;
this.timestamp = System.currentTimeMillis();
}
}
发消息工具类:
@Component
public class MQService {
private final static Logger logger = LoggerFactory.getLogger(MQService.class);
private static enum MSG_TYPE{ ONEWAY, ASYNC, SYNC };
@Autowired
public RocketMQTemplate rocketMQTemplate;
private void sendMsg(MSG_TYPE msg_type, String destination, Object payload, String msgSource){
String msgKey = IdUtils.simpleUUID();
MessageBody msgBody = new MessageBody(msgKey, payload , msgSource);
Message message = MessageBuilder.withPayload(msgBody).setHeader("KEYS",msgKey ).build();
logger.info(String.format("消息发送 MQService 开始: %s %s", destination, message));
SendResult result = null;
switch (msg_type) {
case ONEWAY:
rocketMQTemplate.sendOneWay(destination, message);
break;
case ASYNC:
rocketMQTemplate.asyncSend(destination, message,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(Throwable throwable) {
logger.error("MQService:" + ExceptionUtils.getStackTrace(throwable));
throw new CustomException(String.format("消息发送失败 topic_tag:%s", destination ));
}
});
break;
case SYNC:
result = rocketMQTemplate.syncSend(destination, message);
break;
}
logger.info(String.format("消息发送 MQService 结束: msgId: %s dest: %s msg: %s",result != null ? result.getMsgId() : "", destination, message));
}
public void syncSendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.SYNC,destination, payload,msgSource) ;
}
public void syncSendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
syncSendMsg(destination, payload,msgSource);
}
public void asyncSendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.ASYNC,destination, payload,msgSource);
}
public void asyncSendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
asyncSendMsg(destination, payload,msgSource);
}
public void oneWaySendMsg(String destination, Object payload, String msgSource){
sendMsg(MSG_TYPE.ONEWAY,destination, payload,msgSource);
}
public void oneWaySendMsg(String topic, String tag, Object payload, String msgSource){
// 发送的消息体,消息体必须存在
// 业务主键作为消息key
String destination = topic + ":" + tag;
oneWaySendMsg(destination, payload,msgSource);
}
}
消费者:
@RocketMQMessageListener(topic = "test-topic",nameServer = "${rocketmq.nameServer}",consumerGroup = "${rocketmq.consumer.group}", selectorexpression = "test-tag")
@Component
@Slf4j
public class ComsumerListener implements RocketMQListener {
@Autowired
private ItestService testService;
@Override
public void onMessage(MessageBody messageBody) {
Map map2=JSON.parseObject(JSON.toJSONString(messageBody.getData()),Map.class);
}
}
data的结构生产者和消费者约定好就行了。



