栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

roketmq安装与启动,测试收发消息

roketmq安装与启动,测试收发消息

文章目录
    • 安装与启动roketmq
    • 使用RocketMQ原生API收发消息

安装与启动roketmq

安装roketmq
启动name server命令:

# 进入 rocketmq 目录
cd /usr/local/rocketmq/

# 启动 name server
nohup sh bin/mqnamesrv &

# 查看运行日志, 看到"The Name Server boot success."表示启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log

启动broker命令:

# 启动 broker, 连接name server: localhost:9876
nohup sh bin/mqbroker -n localhost:9876 &

# 查看运行日志, 看到"The broker[......:10911] boot success."表示启动成功
tail -f ~/logs/rocketmqlogs/broker.log 

启动管理界面:
在root目录下执行

nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876 &

访问http://192.168.64.141:8080

使用RocketMQ原生API收发消息

依赖:

	
        
            org.apache.rocketmq
            rocketmq-client
            4.7.1
        

        
            org.apache.rocketmq
            rocketmq-store
            4.7.1
        
    

生产者:

public class Producer {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        // 新建生产者实例
        DefaultMQProducer p = new DefaultMQProducer("p1");
        // 设置 name server 地址
        p.setNamesrvAddr("192.168.64.141:9876");
        // 启动 (连接服务器)
        p.start();

        while (true) {
            System.out.print("输入消息:");
            String s = new Scanner(System.in).nextLine();
            // 把消息封装到 Message 对象
            // import org.apache.rocketmq.common.message.Message;
            // Topic 相当于是一级分类
            // Tag 相当于是二级分类
            Message msg = new Message("Topic1", "Tag1", s.getBytes());
            msg.setDelayTimeLevel(3);//十秒后向消费者投递
            // 发送消息
            SendResult r = p.send(msg);
            // 打印服务器的反馈信息
            System.out.println(r);
        }
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 新建消费者实例
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("c1");
        // 设置 name server
        c.setNamesrvAddr("192.168.64.141:9876");

        // 订阅消息
        // *                        - 所有标签
        // Tag1 || Tag2 || Tag3     - 多种标签
        // Tag1                     - 一种标签
        c.subscribe("Topic1", "Tag1");

        // 设置处理消息的监听器
        // Concurrently - 多个线程并发接收处理消息
        c.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String s = new String(msg.getBody());
                    System.out.println("收到:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                // return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        // 启动
        c.start();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612504.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号