栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ 使用TAG进行消息过滤

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ 使用TAG进行消息过滤

        RocketMQ支持Broker端和Consumer端的TAG过滤,Broker端过滤:减少无用消息的网络传输,但是增加了broker端的负担;Consumer端过滤,增加了无用消息的网络传输,但是减少了broker端的负担。SQL过滤,使用了服务端实现,TAG过滤,主要使用消费端实现。SQL过滤:性能低,支持使用SQL语句复杂的过滤逻辑;TAG过滤:性能高,只支持简单的过滤。Broker端过滤:先变量消息队列中TAG与消费者订阅的TAG使用HASHCODE比较,匹配上则把对应的消息发给消费者,消费者再根据TAG内容做进一步确认,然后消费匹配的消息。

TAG过滤:

发送时指定消息的TAG,消费者订阅主题时也指定对应的TAG

  @GetMapping("sendTag")
    Object callbackTag(String text,String tag) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//        Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
        //发送消息时,指定消息的key,比如订单编号
        Message message = new Message(JmsConfig.TOPIC, tag, "666", ("hello word = " + text).getBytes());
//        Message message = new Message(JmsConfig.TOPIC, "", "666", ("hello word = " + text).getBytes());
//        message.putUserProperty("amount",a);
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
package com.tech.rocketmq.jms;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;


@Slf4j
@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "pay_consumer_group";

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //默认是集群模式,如果改为广播模式不支持消费端重试
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        // * 订阅Topic下所有的TAG
//        consumer.subscribe(JmsConfig.TOPIC, "*");
        //订阅该TOPIC下,TAG为create或pay的消息
        consumer.subscribe(JmsConfig.TOPIC, "create||pay");
        //SQL过滤  属性在发送消息时使用 message.putUserProperty添加
//        consumer.subscribe(JmsConfig.TOPIC, MessageSelector.bySql("amount > 5"));
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt message = list.get(0);
                int reconsumeTimes = message.getReconsumeTimes();
                log.info("重试次数:{}", reconsumeTimes);
                try {
                    log.info("Receive New Message: {}", new String(message.getBody()));
                    String topic = message.getTopic();
                    String tags = message.getTags();
                    String keys = message.getKeys();
//                    if(keys.equals("666")){
//                        throw new Exception("模拟异常");
//                    }
                    log.info("topic={} tags={} keys={}", topic, tags, keys);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("消费异常",e);
                    if(reconsumeTimes>=2){
                        log.info("重试次数大于等于2,记录数据库,发短信通知开发人员或者运营人员");
                        //告诉broker,本次消费成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}
只有消息tag为create、pay时才被消费

sql过滤

发送消息时指定amount,当大于5时才会被消费

// TAG过滤
    @GetMapping("sendTag")
    Object callbackTag(String text,String a) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//        Message message = new Message(JmsConfig.TOPIC, "taga", ("hello word = " + text).getBytes());
        //发送消息时,指定消息的key,比如订单编号
//        Message message = new Message(JmsConfig.TOPIC, tag, "666", ("hello word = " + text).getBytes());
        Message message = new Message(JmsConfig.TOPIC, "", "666", ("hello word = " + text).getBytes());
        message.putUserProperty("amount",a);
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
package com.tech.rocketmq.jms;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.List;


@Slf4j
@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;
    private String consumerGroup = "pay_consumer_group";

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //默认是集群模式,如果改为广播模式不支持消费端重试
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        // * 订阅Topic下所有的TAG
//        consumer.subscribe(JmsConfig.TOPIC, "*");
        //订阅该TOPIC下,TAG为create或pay的消息
//        consumer.subscribe(JmsConfig.TOPIC, "create||pay");
        //SQL过滤  属性在发送消息时使用 message.putUserProperty添加
        consumer.subscribe(JmsConfig.TOPIC, MessageSelector.bySql("amount > 5"));
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt message = list.get(0);
                int reconsumeTimes = message.getReconsumeTimes();
                log.info("重试次数:{}", reconsumeTimes);
                try {
                    log.info("Receive New Message: {}", new String(message.getBody()));
                    String topic = message.getTopic();
                    String tags = message.getTags();
                    String keys = message.getKeys();
//                    if(keys.equals("666")){
//                        throw new Exception("模拟异常");
//                    }
                    log.info("topic={} tags={} keys={}", topic, tags, keys);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    log.error("消费异常",e);
                    if(reconsumeTimes>=2){
                        log.info("重试次数大于等于2,记录数据库,发短信通知开发人员或者运营人员");
                        //告诉broker,本次消费成功
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ...");
    }
}
amount大于5才会被消费

如果启动时报错:

The broker does not support consumer to filter message by SQL92

需要对RocketMQ服务的配置文件进行配置

解决:broker.conf 里面配置如下
enablePropertyFilter=true

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/572299.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号