栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMq

1、pom.xml


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.1.1

2、application.yml

rocketmq:
  producer:
    group: springboot-rocketmq-group
  name-server: 10.16.95.145:9876;10.16.95.146:9876

 3、Producer

(1)send

rocketMQTemplate.convertAndSend("topic:tag", entity);
rocketMQTemplate.send("topic:tag", message);

(2)SendOneWay

不等待broker告知是否收到消息,存在消息丢失风险

rocketMQTemplate.sendoneWay("test-mq",message);

//0决定queueId
rocketMQTemplate.sendoneWayOrderly("test-mq", message, "0");

(3)SyncSend

SendResult result = rocketMQTemplate.syncSend("topic", entity, timeout, delayLevel);

SendResult result = rocketMQTemplate.syncSendOrderly("topic", message, "hashkey");

(4)AsyncSend 

rocketMQTemplate.asyncSend("topic", message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {}
    @Override
    public void onException(Throwable e) {}
});

4、Listener

(1)注解

@Service
@RocketMQMessageListener(topic = "test-mq", consumerGroup = "test-mq", consumeMode = ConsumeMode.ORDERLY)

(2)Message

public class MqListener implements RocketMQListener {
    @Override
    public void onMessage(MqEntity mqEntity) {
        
    }
}

 (3)Message

public class MqListener implements RocketMQListener {
    @Override
    public void onMessage(Message message) {
        MqEntity mqEntity = JSONObject.parseObject(message.getBody(),MqEntity.class);
        //consume
    }
}

5、Message的构造

(1)Message

import org.springframework.messaging.Message;

Map header = new HashMap<>();
header.put(RocketMQHeaders.KEYS, mqEntity.getId());
Message msg = new GenericMessage<>(mqEntity, header);

(2)Message

import org.apache.rocketmq.common.message.Message;

Message message = new Message("test-mq","tag","key", JSON.toJSonString(mqEntity).getBytes(StandardCharsets.UTF_8));
message.setDelayTimeLevel(3);

(3)MessageBuilder

MessageBuilder.withPayload(mqEntity)
    .setHeader("KEYS", mqEntity.getId())
    .build()

6、延迟等级与时间

DelayTime:1s,5s,10s,30s,1m,2m,…,9m,10m,20m,30m,1h,2h

DelayLevel:1,2,3,4,5,6,…,13,14, 15,16,17,18

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281476.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号