栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

二、RocketMQ常用消息实战

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

二、RocketMQ常用消息实战

准备 Rocket部署 下载源码并构建

下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip

> unzip rocketmq-all-4.9.3-source-release.zip
> cd rocketmq-all-4.9.3/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3
启动NameServer
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log 
The broker[%s, 172.30.30.233:10911] boot success...
测试消息的发送和消费
> export NAMESRV_ADDR=localhost:9876
 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

 > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...
停止服务

如果需要停止服务,执行如下命令

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
部署RocketMQ Dashbord

为了方便管理,我们还需要Dashbord
下载地址:https://gitee.com/nswish/rocketmq-dashboard/repository/archive/master.zip

修改配置文件application.properties,增加配置rocketmq.config.namesrvAddr地址

# 省略...
rocketmq.config.namesrvAddr=localhost:9876
# 省略...

构建并启动

# Maven spring-boot run
mvn spring-boot:run

# 或 Maven build and run
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

访问:http://localhost:8080。
至此,RocketMQ已经部署完成。

项目中导入依赖

  org.apache.rocketmq
  rocketmq-client 
  4.9.3

普通消息 消息发送

消息发送的步骤:

    创建消息生产者 producer,并指定生产者组名指定 Nameserver 地址启动 producer创建消息对象,指定 Topic、Tag 和消息体发送消息关闭生产者 producer
同步发送

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知等。

代码示例:

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
异步发送

异步消息通常用在对响应时间敏感的业务场景。

示例代码:

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
        //启用Broker故障延迟机制
        producer.setSendLatencyFaultEnable(true);

        for (int i = 0; i < 100; i++) {
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(10000);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

单向发送

这种方式主要用在不需要关心发送结果的场景,例如日志发送。

示例代码:

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer   对象。
        DefaultMQProducer producer = new DefaultMQProducer("test_group");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 20; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);

        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
消息发送的权衡
发送方式发送TPS发送结果反馈可靠性适用场景
同步发送可靠适用广泛,如重要的消息通知,短信通知等。
异步发送可靠对响应时间敏感的应用场景
单向发送最快不 可靠可靠性要求不高的场景,如日志采集
消息消费
    创建消费者 Consumer,指定消费者组名指定 Nameserver 地址订阅主题 Topic 和 Tag设置回调函数,处理消息启动消费者 consumer
集群消费


一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。实际上,每个 Consumer 是平均分摊 Message Queue 的。例如,一个 Topic 有3个 Queue,其中一个Consumer Group 有3个实例,那么每个实例只消费其中一个Queue。
这种模式下,消费进度(Consumer Offset)的存储会持久化到 Broker。

代码示例,启动同一分组下的两个消费者

public class BalanceComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.setMaxReconsumeTimes(1);
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
广播消费


消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。实际上,是一个消费组下的每个消费者实例都获取到了 topic 下面的每个 Message Queue 去拉取消费。
这种模式下,消费进度(Consumer Offset)会存储持久化到实例本地。

代码示例,启动统一分组下的两个消费者

public class BroadcastComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("TopicTest", "*");
        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
消息消费的权衡 集群模式
    消费端集群化部署,每条消息只需要被处理一次。由于消费进度在服务端维护,可靠性更高。集群消费模式下,每一条消息都只会被分发到一台机器上处理。
广播模式
    每条消息都需要被相同逻辑的多台机器处理。消费进度在客户端维护,出现重复的概率稍大于集群模式。不支持顺序消息、不支持重置消费位点。广播模式下, RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。
顺序消息

在默认的情况下,消息发送会采取轮询方式把消息发送到不同的 queue;而消费消息的时候是从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序的。

但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue,消息都是有序的。
全局顺序消息:

分区顺序消息:

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,下面是订单进行分区有序的示例代码。

public class ProducerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};
        // 订单列表
        List orderList = new ProducerInOrder().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < orderList.size(); i++) {
            // 加个时间前缀
            String body = dateStr + " Order:" + orderList.get(i);
            Message msg = new Message("PartOrder", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    Long id = (Long) arg;  //根据订单id选择发送queue
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());//订单id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }

        producer.shutdown();
    }

    
     @Getter
     @Setter
     @ToString
    private static class Order {
        private long orderId;
        private String desc;
    }

    
    private List buildOrders() {
        List orderList = new ArrayList();

        Order orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406002L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406003L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new Order();
        orderDemo.setOrderId(20210406001L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

运行结果:

SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
SendResult status:SEND_OK, queueId:2, body:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
SendResult status:SEND_OK, queueId:3, body:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
SendResult status:SEND_OK, queueId:1, body:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}

使用顺序消息时,首先要保证消息是有序进入 MQ 的,对 id 等关键字进行取模后,放入指定 messageQueue中,Consume 消费消息失败时, 不能返回 reconsume_later,这样会导致乱序,应该返回 suspend_current_queue_a_moment。

消费时,同一个 OrderId 获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。

public class ConsumerInOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("PartOrder", "TagA || TagC || TagD");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                } catch (Exception e) {
                    e.printStackTrace();
                    //这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

运行结果:

consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='创建'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='创建'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='付款'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='推送'}
consumeThread=ConsumeMessageThread_6queueId=3, content:2022-03-20 21:50:55 Order:Order{orderId=20210406003, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='推送'}
consumeThread=ConsumeMessageThread_5queueId=2, content:2022-03-20 21:50:55 Order:Order{orderId=20210406002, desc='完成'}
consumeThread=ConsumeMessageThread_4queueId=1, content:2022-03-20 21:50:55 Order:Order{orderId=20210406001, desc='完成'}
延时消息

Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费, 该消息即延时消息。

消息生产和消费有时间窗口要求;比如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

Apache RocketMQ 目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。

延迟消息是根据延迟队列的 level 来的,延迟队列默认是msg.setDelayTimeLevel(3)代表延迟 10 秒;、“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。源码中:org/apache/rocketmq/store/config/MessageStoreConfig.java

生产者示例代码:

public class ScheduledMessageProducer {
    public static void main(String[] args) throws Exception {
        // 实例化一个生产者来产生延时消息
        DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        int totalMessagesToSend = 10;
        for (int i = 0; i < totalMessagesToSend; i++) {
            Message message = new Message("ScheduledTopic", ("Hello scheduled message " + i).getBytes());
            // 设置延时等级3,这个消息将在10s之后投递给消费者(详看delayTimeLevel)
            // delayTimeLevel:(1~18个等级)"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            message.setDelayTimeLevel(4);
            // 发送消息
            producer.send(message);
        }
        // 关闭生产者
        producer.shutdown();
    }
}

消费者示例代码:

public class ScheduledMessageConsumer {
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topics
        consumer.subscribe("ScheduledTopic", "*");
        // 注册消息监听者
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (message.getStoreTimestamp() - message.getBornTimestamp()) + "ms later");
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }
}
批量消息

批量发送消息能显著提高传递消息的性能。限制是,这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB。

生产者示例代码:

public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        String topic = "BatchTest";
        List messages = new ArrayList<>();
        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
        try {
            producer.send(messages);
        } catch (Exception e) {
            producer.shutdown();
            e.printStackTrace();
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者示例代码:

public class BatchComuser {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者,指定组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BatchComsuer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅Topic
        consumer.subscribe("BatchTest", "*");
        //负载均衡模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
批量切分

如果消息的总长度可能大于 4MB 时,这时候需要把消息进行分割。

生产者示例代码:

public class SplitBatchProducer {

    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();

        //large batch
        String topic = "BatchTest";
        List messages = new ArrayList<>(100 * 1000);
        //10万元素的数组
        for (int i = 0; i < 100 * 1000; i++) {
            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
        }

        //把大的消息分裂成若干个小的消息(1M左右)
        ListSplitter splitter = new ListSplitter(messages);
        while (splitter.hasNext()) {
            List listItem = splitter.next();
            producer.send(listItem);
            Thread.sleep(100);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
        System.out.printf("Consumer Started.%n");
    }

}

class ListSplitter implements Iterator> {
    private int sizeLimit = 1000 * 1000;//1M
    private final List messages;
    private int currIndex;

    public ListSplitter(List messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List next() {
        int nextIndex = currIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) {
            Message message = messages.get(nextIndex);
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map properties = message.getProperties();
            for (Map.Entry entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            tmpSize = tmpSize + 20; // 增加日志的开销20字节
            if (tmpSize > sizeLimit) {
                //单个消息超过了最大的限制(1M)
                //忽略,否则会阻塞分裂的进程
                if (nextIndex - currIndex == 0) {
                    //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                    nextIndex++;
                }
                break;
            }
            if (tmpSize + totalSize > sizeLimit) {
                break;
            } else {
                totalSize += tmpSize;
            }

        }
        List subList = messages.subList(currIndex, nextIndex);
        currIndex = nextIndex;
        return subList;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not allowed to remove");
    }
}
过滤消息 Tag 过滤

在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。

生产者示例代码:

public class TagFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[]{"TagA", "TagB", "TagC"};

        for (int i = 0; i < 60; i++) {
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者示例代码:

public class TagFilterConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterComsumer");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TagFilterTest", "TagA || TAGB || TAGC");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");

                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是一个消息只能有一个标签。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。

Sql 过滤 SQL 基本语法

RocketMQ 定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,常用的语句如下:
数值比较: 比如:>,>=,<,<=,BETWEEN,=;
字符比较: 比如:=,<>,IN; IS NULL 或者 IS NOT NULL; 逻辑符号:AND,OR,NOT;
常量支持类型为: 数值,比如:123,3.1415; 字符,比如:‘abc’,必须用单引号包裹起来; NULL,特殊的常量;布尔值,TRUE 或 FALSE
生产者示例代码:

public class SqlFilterProducer {

    public static void main(String[] args) throws Exception {

        DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC"};

        for (int i = 0; i < 10; i++) {
            Message msg = new Message("SqlFilterTest",
                tags[i % tags.length],
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置一些属性
            msg.putUserProperty("a", String.valueOf(i));

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }

        producer.shutdown();
    }
}

消费者示例代码:

public class SqlFilterConsumer {

    public static void main(String[] args) throws Exception {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
        consumer.setNamesrvAddr("localhost:9876");
        // Don't forget to set enablePropertyFilter=true in broker
        consumer.subscribe("SqlFilterTest",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                        "and (a is not null and a between 0 and 3)"));

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String msgPro = msg.getProperty("a");

                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,a : " + msgPro + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

如果出现如下报错错误:说明 Sql92 功能没有开启

需要修改 Broker.conf 配置文件。
加入 enablePropertyFilter=true 然后重启 Broker 服务。

事物消息


如图,事务消息分为两个流程:

正常事务消息的发送和提交(1,2,3,4)事务消息的补偿流程(4,6,7) 正常事务流程

    发送半事务消息。服务端响应半事务消息的发送结果。根据发送结果执行本地事务。如果写入失败,此时半消息对业务不可见,本地逻辑不执行。根据本地事务状态执行 Commit 或者 Rollback(Commit 操作生成消息索引,消息对消费者可见)。
事务补偿流程

补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。

    对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次“回查”。Producer 收到回查消息,检查回查消息对应的本地事务的状态。根据本地事务状态,重新 Commit 或者 Rollback。
事务消息状态

事务消息共有三种状态:
TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown:它代表需要检查消息队列来确定状态。

使用 TransactionMQProducer 类创建生产者,并指定唯一的 ProducerGroup,通过设置自定义线程池来处理事务回查请求。
执行本地事务后、需要根据执行结果对消息队列进行回复。

生产者示例代码

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("hzy_produce");
        producer.setNamesrvAddr("localhost:9876");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        //设置生产者回查线程池
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。checkLocalTransaction 方法用于检查本地事务状态。

事务监听器示例代码

public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap localTrans = new ConcurrentHashMap<>();

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
常用的属性和方法 生产者
public class ProducerDetails {
    public static void main(String[] args) throws Exception{
        // producerGroup:生产者所属组(针对 事务消息 高可用)
        DefaultMQProducer producer = new DefaultMQProducer("produce_details");
        // 默认主题在每一个Broker队列数量(对于新创建主题有效)
        producer.setDefaultTopicQueueNums(8);
        // 发送消息默认超时时间,默认3s (3000ms)
        producer.setSendMsgTimeout(1000*3);
        // 消息体超过该值则启用压缩,默认4k
        producer.setCompressMsgBodyOverHowmuch(1024 * 4);
        // 同步方式发送消息重试次数,默认为2,总共执行3次
        producer.setRetryTimesWhenSendFailed(2);
        // 异步方式发送消息重试次数,默认为2,总共执行3次
        producer.setRetryTimesWhenSendAsyncFailed(2);
        // 消息重试时选择另外一个Broker时(消息没有存储成功是否发送到另外一个broker),默认为false
        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
        // 允许发送的最大消息长度,默认为4M
        producer.setMaxMessageSize(1024 * 1024 * 4);

        // 设置NameServer的地址
        producer.setNamesrvAddr("106.55.246.66:9876");//106.55.246.66
        // 启动Producer实例
        producer.start();
        // 0 查找该主题下所有消息队列
        List MessageQueue = producer.fetchPublishMessageQueues("TopicTest");
        for (int i = 0; i < MessageQueue.size(); i++) {
            System.out.println(MessageQueue.get(i).getQueueId());
        }
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            // 单向发送
            // 1.1发送单向消息
            producer.sendOneway(msg);
            // 1.2指定队列单向发送消息(使用select方法)
            producer.sendOneway(msg,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    return mqs.get(0);
                }
            },null);
            // 1.3指定队列单向发送消息(根据之前查找出来的主题)
            producer.sendOneway(msg,MessageQueue.get(0));


            // 同步发送
            // 2.1同步发送消息
            SendResult sendResult0 = producer.send(msg);
            // 2.1同步超时发送消息(属性设置:sendMsgTimeout 发送消息默认超时时间,默认3s (3000ms) )
            SendResult sendResult1 = producer.send(msg,1000*3);
            // 2.2指定队列同步发送消息(使用select方法)
            SendResult sendResult2 = producer.send(msg,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List mqs, Message msg, Object arg) {
                    return mqs.get(0);
                }
            },null);
            // 2.3指定队列同步发送消息(根据之前查找出来的主题队列信息)
            SendResult sendResult3 = producer.send(msg,MessageQueue.get(0));


            // 异步发送
            // 3.1异步发送消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
                }
            });
            // 3.1异步超时发送消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
                }
            },1000*3);
            // 3.2选择指定队列异步发送消息(根据之前查找出来的主题队列信息)
            producer.send(msg,MessageQueue.get(0),
                    new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
                }
            });
            // 3.3选择指定队列异步发送消息(使用select方法)
            producer.send(msg,new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List mqs, Message msg, Object arg) {
                            return mqs.get(0);
                        }
                    },
                    new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();
                        }
                    });
        }
        Thread.sleep(10000);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
消费者
public class ComuserDetails {
    public static void main(String[] args) throws Exception {
        // 属性
        // consumerGroup:消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("king");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("106.55.246.66:9876");
        // 消息消费模式(默认集群消费)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 指定消费开始偏移量(上次消费偏移量、最大偏移量、最小偏移量、启动时间戳)开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 消费者最小线程数量(默认20)
        consumer.setConsumeThreadMin(20);
        // 消费者最大线程数量(默认20)
        consumer.setConsumeThreadMax(20);
        // 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
        consumer.setPullInterval(0);
        // 推模式下任务拉取的条数,默认32条(一批批拉)
        consumer.setPullBatchSize(32);
        // 消息重试次数,-1代表16次 (超过 次数成为死信消息)
        consumer.setMaxReconsumeTimes(-1);
        // 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)
        consumer.setConsumeTimeout(15);

        // 获取消费者对主题分配了那些消息队列
        Set MessageQueueSet  = consumer.fetchSubscribeMessageQueues("TopicTest");
        Iterator iterator = MessageQueueSet.iterator();
        while(iterator.hasNext()){
            MessageQueue MessageQueue =(MessageQueue)iterator.next();
            System.out.println(MessageQueue.getQueueId());
        }
        // 方法-订阅
        // 基于主题订阅消息,消息过滤使用表达式
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        // 基于主题订阅消息,消息过滤使用表达式
        consumer.subscribe("TopicTest",MessageSelector.bySql("a between 0 and 3"));
        // 基于主题订阅消息,消息过滤使用表达式
        consumer.subscribe("TopicTest",MessageSelector.byTag("tagA|TagB"));
        // 取消消息订阅
        consumer.unsubscribe("TopicTest");

        // 注册监听器
        // 注册并发事件监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {
                    for(MessageExt msg : msgs) {
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    //没有成功  -- 到重试队列中来
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //
            }
        });

        // 注册顺序消息事件监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }
                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                } catch (Exception e) {
                    e.printStackTrace();
                    // 这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        //启动消息者
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781627.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号