1、pom.xml
org.apache.rocketmq rocketmq-spring-boot-starter2.1.1
2、application.yml
rocketmq:
producer:
group: springboot-rocketmq-group
name-server: 10.16.95.145:9876;10.16.95.146:9876
3、Producer
(1)send
rocketMQTemplate.convertAndSend("topic:tag", entity);
rocketMQTemplate.send("topic:tag", message);
(2)SendOneWay
不等待broker告知是否收到消息,存在消息丢失风险
rocketMQTemplate.sendoneWay("test-mq",message);
//0决定queueId
rocketMQTemplate.sendoneWayOrderly("test-mq", message, "0");
(3)SyncSend
SendResult result = rocketMQTemplate.syncSend("topic", entity, timeout, delayLevel);
SendResult result = rocketMQTemplate.syncSendOrderly("topic", message, "hashkey");
(4)AsyncSend
rocketMQTemplate.asyncSend("topic", message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {}
@Override
public void onException(Throwable e) {}
});
4、Listener
(1)注解
@Service @RocketMQMessageListener(topic = "test-mq", consumerGroup = "test-mq", consumeMode = ConsumeMode.ORDERLY)
(2)Message
public class MqListener implements RocketMQListener{ @Override public void onMessage(MqEntity mqEntity) { } }
(3)Message
public class MqListener implements RocketMQListener{ @Override public void onMessage(Message message) { MqEntity mqEntity = JSONObject.parseObject(message.getBody(),MqEntity.class); //consume } }
5、Message的构造
(1)Message
import org.springframework.messaging.Message; Mapheader = new HashMap<>(); header.put(RocketMQHeaders.KEYS, mqEntity.getId()); Message msg = new GenericMessage<>(mqEntity, header);
(2)Message
import org.apache.rocketmq.common.message.Message;
Message message = new Message("test-mq","tag","key", JSON.toJSonString(mqEntity).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3);
(3)MessageBuilder
MessageBuilder.withPayload(mqEntity)
.setHeader("KEYS", mqEntity.getId())
.build()
6、延迟等级与时间
DelayTime:1s,5s,10s,30s,1m,2m,…,9m,10m,20m,30m,1h,2h
DelayLevel:1,2,3,4,5,6,…,13,14, 15,16,17,18



