栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot

RocketMQ 消息中间件
  • 集成 SpringBoot
    • 入门案例
    • 生产消息类型 - 同步、异步、一次性
    • 消费模式 - 集群、广播
    • 延时消息
    • 设置消息标签
    • 设置消息的 Key
    • 自定义属性设置
    • 消息过滤
    • 发送消息的方式

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

集成 SpringBoot 入门案例

依赖:


	org.apache.rocketmq
	rocketmq-spring-boot-starter
	2.1.0

生产者:

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
rocketmq.producer.group=my_group
server.port=9999
  • 实现代码:
@RestController
public class HelloController {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @RequestMapping("01_hello")
    public String sendMsg(String message) throws Exception{
        SendResult sendResult = rocketMQTemplate.syncSend("01_boot_hello", message);
        return JSON.toJSONString(sendResult);
    }
}

消费者:

  • 配置文件:
rocketmq.name-server=192.168.52.128:9876
server.port=7777
  • 实现代码:
@Component
@RocketMQMessageListener(
        topic = "01_boot_hello",
        consumerGroup = "maoge_consumer"
)
public class HelloConsumer implements RocketMQListener {
    @Override
    public void onMessage(String msg) {
        System.out.println("接收到的消息:" + msg);
    }
}
生产消息类型 - 同步、异步、一次性

同步消息:syncSend

rocketMQTemplate.syncSend("01_boot_hello", message);

异步消息:asyncSend

rocketMQTemplate.asyncSend("02_boot_async", message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("消息发送成功:" + JSON.toJSONString(sendResult));
    }
    @Override
    public void onException(Throwable throwable) {
        System.out.println("消息处理失败:" + throwable.getMessage());
    }
});

一次性消息:sendOneWay

rocketMQTemplate.sendOneWay("03_boot_oneway",message);
消费模式 - 集群、广播

集群模式:

@Component
@RocketMQMessageListener(
        topic = "02_boot_model",
        messageModel = MessageModel.CLUSTERING,
        consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer01 implements RocketMQListener {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(JSON.toJSONString(message));
    }
}

广播模式:

// 广播模式是实时消费消息的,在广播模式消费者启动之前的消息,无法接收
// 广播模式下发送失败的消息不会重试
@Component
@RocketMQMessageListener(
        topic = "02_boot_model",
        messageModel = MessageModel.BROADCASTING,
        consumerGroup = "02_boot_cluster"
)
public class ClusterConsumer02 implements RocketMQListener {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(JSON.toJSONString(message));
    }
}
延时消息

使用原生的 Produce 对象:

DefaultMAProducer producer =  rocketMQTemplate.getProducer();
Message message = new Message(topic, "TagA", "9527", msg.getBytes());
message.setDelayTimeLevel(3);
// 在实际工作中,确保消息可靠性,捕获对应的异常
producer.send(message);

使用 Spring 接口:

Message msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.syncSend("04_boot_delay", msg, 3000, 3);
设置消息标签
// 在发送的消息 Topic:Tag 中间使用冒号隔开
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
设置消息的 Key
Message msg = MessageBuilder.withPayload(message)
	.setHeader(MessageConst.PROPERTY_KEYS, "1100").build();
rocketMQTemplate.send("01-boot-hello", msg);
自定义属性设置
// 过滤设置: 需要开启 broker 的支持用户属性配置
// enablePropertyFilter=true

Map map=new HashMap<>();
//用户自定义属性
map.put("name", "hesj");
map.put("age", "18");
//也可以设置系统属性
map.put(MessageConst.PROPERTY_KEYS,age);
rocketMQTemplate.convertAndSend("01-boot-hello:TagB", message, map);
消息过滤
// 在RocketMQMessageListener添加注解
@RocketMQMessageListener(
        consumerGroup = "02_boot_cluster",
        topic = "02_boot_model",
        messageModel = MessageModel.BROADCASTING,
		// 	消息过滤
        selectorType = SelectorType.TAG,
		selectorexpression = "age > 16"
)
发送消息的方式
  • 直接使用 rocketMQTemplate
  • 使用 DefaultMQProducer 对象
  • 使用 Spring 的 Message 接口
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344912.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号