栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rocketmq 客户端消息生产消费Demo测试

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rocketmq 客户端消息生产消费Demo测试

rocketmqdemo测试 1.创建工程引入maven依赖

服务端使用的4.9.1,java客户端使用4.5.0的包

    
        
            org.apache.rocketmq
            rocketmq-client
            4.5.0
        
    
2.消息生产者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
3.消息消费者
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
消息发送 1.发送同步消息
package com.test.mq.rocketmq.base.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.TimeUnit;


public class SyncProducer {
//1.创建消息生产者producer,并制定生产者组名
//2.指定Nameserver地址
//3.启动producer
//4.创建消息对象,指定主题Topic、Tag和消息体
//5.发送消息
//6.关闭生产者producer
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer的地址
        producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
        producer.setSendMsgTimeout(6000);
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" ,
                    "TagB" ,
                    ("Hello RocketMQHJW " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
            Thread.sleep(1000);
//            TimeUnit.SECONDS.sleep(10);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

其中遇到的问题会有同步发送消息超时,解决方案,设置一下java客户端发送消息超时时间
https://blog.csdn.net/qq_16241519/article/details/103926356

发送消息的返回值如下情况,显示了消息发送在哪个broker下

2.发送异步消息

和发送同步消息基本一样,只是在producer.send的时候写入返回如果成功或者异常的时候如何处理

package com.test.mq.rocketmq.base.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group2");
        // 设置NameServer的地址
        producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
        producer.setSendMsgTimeout(6000);
        // 启动Producer实例
        producer.start();
//        producer.setRetryTimesWhenSendAsyncFailed(0);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest",
                    "TagB",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                }
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }

            });
            Thread.sleep(1000);

        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

发送结果如下

3.发送单向消息

如果生产者不关心消息发送是否成功失败的情况可以使用,例如发送日志

package com.test.mq.rocketmq.base.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


public class onewayProducer {
    public static void main(String[] args) throws Exception{
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 设置NameServer的地址
        producer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
        producer.setSendMsgTimeout(6000);
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" ,
                    "TagA" ,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
            );
            // 发送单向消息,没有任何返回结果
            producer.sendoneway(msg);
            Thread.sleep(1000);

        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
消息消费
package com.test.mq.rocketmq.base.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


public class Consumer {
//1.创建消费者Consumer,制定消费者组名
//2.指定Nameserver地址
//3.订阅主题Topic和Tag
//4.设置回调函数,处理消息
//5.启动消费者consumer
public static void main(String[] args) throws Exception {
    // 实例化消息生产者,指定组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // 指定Namesrv地址信息.
    consumer.setNamesrvAddr("10.211.55.10:9876;10.211.55.8:9876");
    // 订阅Topic
    consumer.subscribe("TopicTest", "TagB");
    //负载均衡模式消费
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调函数,处理消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                        ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n",
                    Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消息者
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
}

打印结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/301486.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号