栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

消息中间件RocketMQ环境搭建、测试及使用

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息中间件RocketMQ环境搭建、测试及使用

一、概述

RocketMQ是阿里巴巴开源的分布式消息中间件,现在是Apache的一个顶级项目。在阿里内部使用非常广泛,已经经过了"双11"这种万亿级的消息流转,性能稳定、高效。

二、环境搭建

下载RocketMQ
我们在linux平台下安装一个RocketMQ的服务。
下载RocketMQ,地址 https://github.com/apache/rocketmq/releases

环境要求
Linux 64位操作系统
64bit JDK 1.8+

安装RocketMQ
1 创建工作目录

[root@allen ~]#  mkdir /usr/rocketmq

2 上传文件到Linux系统
rocketmq-all-4.4.0-bin-release.zip

3 解压到安装目录

[root@allen rocketmq]# unzip rocketmq-all-4.4.0-bin-release.zip
[root@allen rocketmq]# mv rocketmq-all-4.4.0-bin-release ../rocketmq

启动RocketMQ
1 切换到工作目录

cd  /usr/rocketmq/rocketmq-all-4.4.0-bin-release/bin

2 启动NameServer
[root@allen bin]# nohup ./mqnamesrv &
[1] 1467
#只要进程不报错,就应该是启动成功了,可以查看一下日志
[root@allen bin]# tail -f /root/logs/rocketmqlogs/namesrv.log

3 启动Broker
#编辑bin/runbroker.sh 和 bin/runserver.sh文件,根据本级配置调整工作内存大小,修改里面的
#JAVA_OPT="KaTeX parse error: Expected 'EOF', got '#' at position 42: …-Xmx8g -Xmn4g" #̲为JAVA_OPT="{JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
启动Broker命令如下:

[root@allen bin]# nohup bin/mqbroker -n localhost:9876 &
[root@allen bin]# tail -f /root/logs/rocketmqlogs/broker.log
三、测试RocketMQ

1 测试消息发送

[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer


2 测试消息接收

[root@allen bin]# export NAMESRV_ADDR=localhost:9876
[root@allen bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Consumer


关闭RocketMQ

[root@allen bin]# ./mqshutdown broker
[root@allen bin]# ./mqshutdown namesrv
四、使用JAVA编程进行消息收发

接下来我们使用Java代码来演示消息的发送和接收
1 在POM文件用导入依赖


	org.apache.rocketmq
	rocketmq-spring-boot-starter
	2.0.2

2 发送消息
消息发送步骤:

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者,并设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2.为生产者设置NameServer地址
        producer.setNamesrvAddr("192.168.1.110:9876");

        //3.启动生产者
        producer.start();

        //4.构建消息对象,设置主题、标签、内容
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.发送消息
        SendResult result = producer.send(message, 10000);
        System.out.println(result);

        //6.关闭生产者
        producer.shutdown();
    }
}

发送成功

3 接收消息
消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class RocketMQReceiveMessageTest {
    public static void main(String[] args) throws Exception {
        //1. 创建消息消费者, 指定消费者所属的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group");

        //2. 指定Nameserver地址
        consumer.setNamesrvAddr("192.168.1.110:9876");

        //3. 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //4. 设置回调函数,编写处理消息的方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println("Receive New Messages: " + msgs);

                //返回消费状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5. 启动消息消费者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

接收成功

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/511167.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号