栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQConsumer两种接收消息的方式

RocketMQConsumer两种接收消息的方式

消费者消费消息有两种模式,一种是消费者主动去Broker上拉取消息的拉模式,另一种是消费者等待Broker把消息推送过来的推模式。

拉模式

Consumer主动发送请求到MQ去拉取消息

老版本

需要自己管理Offset偏移量,不建议使用.

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;


public class PullConsumer {
    
    private static final Map OFFSE_TABLE = new HashMap();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        consumer.start();
        //从 topic拿的MessageQueues集合 ,这个MessageQueues是生产者发送消息和消费者接收消息订阅的最小单位
        Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    //拉取消息
                    PullResult pullResult =
                            
                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        consumer.shutdown();
    }

    
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;

        return 0;
    }

    
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }

}

新版本

优先使用新版本 新版本也可以手动管理偏移量.下面案例是自动偏移量的不需要咱们管理

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;


public class LitePullConsumerSubscribe {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
        litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        litePullConsumer.subscribe("TopicTest", "*");
        litePullConsumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        litePullConsumer.start();
        try {
            //循环获取

            while (running) {
                List messageExts = litePullConsumer.poll();
                //打印拉取结果
                System.out.printf("%s%n", messageExts);
            }
        } finally {
            litePullConsumer.shutdown();
        }
    }
}

推模式

相当于被动的方式,由Broker收到消息之后主动推送给消费者.

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;


public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");

        consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setConsumeTimestamp("20181109221800");
        
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                //这里是执行业务逻辑
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回给Broker状态, 表明消费者消费是否成功
                // CONSUME_SUCCESS: 表示消费成功
                // RECONSUME_LATER:表示消费失败,重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

输出:

ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=3, storeSize=214, queueOffset=396, sysFlag=0, bornTimestamp=1634462026024, bornHost=/172.16.10.1:58143, storeTimestamp=1634462054202, storeHost=/172.16.10.103:10911, msgId=AC100A6700002A9F000000000009AA51, commitLogOffset=633425, bodyCRC=198614610, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=397, KEYS=OrderID188, CONSUME_START_TIME=1634462026031, UNIQ_KEY=AC100A0130FC18B4AAC2561831280001, CLUSTER=rocketmq-cluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100], transactionId='null'}]] 
两者区别

拉模式是主动 , Consumer主动去拉消息
推模式是被动, Consumer被动的接收消息.

通常情况下,用推模式比较简单。

实际上RocketMQ的推模式也是由拉模式封装出来的。

4.7.1版本中DefaultMQPullConsumerImpl这个消费者类已标记为过期,但是还是可以使用的。替换的类是DefaultLitePullConsumerImpl。

生产者代码
package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        //NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
        producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
        producer.start();

        for (int i = 0; i < 20; i++)
            try {
                {
                    Message msg = new Message("TopicTest", // 发送的topic
                            "TagA",  //tags
                            "OrderID188", // keys3
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
                    );
                    //同步传递消息,消息会发给集群中的一个Broker节点。
                    //这个发送方法是void方法,说明这个消息发送过去了之后,Producer是不知道的
                    //不知道消息是否发送成功,反正Producer发送完了就不管了 .
                    producer.sendOneway(msg);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/344700.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号