栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

基于Docker搭建RocketMq

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于Docker搭建RocketMq

1.下载RocketMq镜像文件

docker pull rocketmqinc/rocketmq

2.创建namesrv数据存储路径

mkdir -p  /root/dk/rocketMq/data/namesrv/logs   /root/dk/rocketMq/data/namesrv/store

3.构建namesrv容器

docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /root/dk/rocketMq/data/namesrv/logs:/root/logs -v /root/dk/rocketMq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv

4.创建broker数据存储路径

mkdir -p   /root/dk/rocketMq/data/broker/logs    /root/dk/rocketMq/data/broker/store   /root/dk/rocketMq/conf

5.创建配置文件

vi /root/dk/rocketMq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个

brokerClusterName = DefaultCluster

#broker名称,master和slave使用相同的名称,表明他们的主从关系

brokerName = broker-a

#0表示Master,大于0表示不同的slave

brokerId = 0

#表示几点做消息删除动作,默认是凌晨4点

deleteWhen = 04

#在磁盘上保留消息的时长,单位是小时

fileReservedTime = 48

#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;

brokerRole = ASYNC_MASTER

#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;

flushDiskType = ASYNC_FLUSH

# 设置broker节点所在服务器的ip地址

brokerIP1 = 10.160.32.28

6.构建broker容器

docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v  /docker/rocketmq/data/broker/logs:/root/logs -v  /root/dk/rocketMq/data/broker/store:/root/store -v /root/dk/rocketMq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

7.拉取rocketMq-console服务

docker pull pangliang/rocketmq-console-ng

8.构建rocketmq-console容器

docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.160.32.28:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 9999:8080 pangliang/rocketmq-console-ng

9.创建Topic

sh mqadmin updateTopic -n 10.160.32.28:9876 -b localhost:10911 -t DkTestTopic1

问题:unable to calculate a request signature. error=Algorithm HmacSHA1 not available

解决方法:

/opt/rocketmq-4.4.0/bin/tools.sh文中JAVA_OPT末尾加上jre的具体路径,jre的路径可以利用find / -type d -name jre查找

10.创建订阅组

sh mqadmin updateSubGroup -c DefaultCluster -g TopicTestGroup -n 10.160.32.28:9876 -b localhost:10911

11.生产者发送消息

sh tools.sh org.apache.rocketmq.example.quickstart.Producer

12.消费者接收消息

sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

13.删除Topic

sh mqadmin deleteTopic -t TopicTest -c DefaultCluster -n 10.160.32.28:9876

14.删除订阅组

sh mqadmin deleteSubGroup -g topicTestGroup -c DefaultCluster -n 10.160.32.28:9876

java验证发送接收消息:

public static void produces() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");

        producer.setNamesrvAddr("10.160.32.28:9876");

        producer.start();

        for (int i = 0; i < 5; i++) {

            //1.创建消息
            Message message = new Message(
                    "DkTestTopic1",//主题
                    "TagA",//标签
                    "key" + i,//用户自定义的key,唯一的标识
                    ("Hello RocketMQ测试消息-=-========" + i).getBytes()//消息内容实体(byte[])
            );

            //2.发送消息
            SendResult sr = producer.send(message);
            System.out.println("消息发出: " + sr);
        }

        producer.shutdown();

    }

    public static void consumers() throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");

        // Specify name server addresses.
        consumer.setNamesrvAddr("10.160.32.28:9876");

        // Subscribe one more  topics to consume. *表示订阅所有tags
        consumer.subscribe("DkTestTopic1", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

9876端口在外部不能访问时查看

 sysctl  net.ipv4.ip_forward

值为0时需要在/etc/sysctl.conf配置文件中将其修改为1

查看防火墙是否打开

# 防火墙状态查看
systemctl status firewalld

# 防火墙关闭
systemctl stop firewalld.service

# 添加端口到防火墙(--permanent表示永久生效,没有此参数重启后失效)
firewall-cmd --zone=public --add-port=9876/tcp --permanent

# 重新加载
firewall-cmd --reload

# 查看端口添加是否成功
firewall-cmd --zone=public --query-port=9876/tcp

# 删除端口
firewall-cmd --zone=public --remove-port=9876/tcp --permanent

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/320539.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号