栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

使用docker安装RocketMq

使用docker安装RocketMq

1.创建namesrv服务
拉取镜像

docker pull rocketmqinc/rocketmq

创建nameServer存储路径

mkdir -p  /docker/rocketmq/data/namesrv/logs  /docker/rocketmq/data/namesrv/store

构建namesrv容器

docker run -d 
--restart=always 
--name rmqnamesrv 
-p 9876:9876 
-v /docker/rocketmq/data/namesrv/logs:/root/logs 
-v /docker/rocketmq/data/namesrv/store:/root/store 
-e "MAX_POSSIBLE_HEAP=100000000" 
rocketmqinc/rocketmq 
sh mqnamesrv 

2.创建broker节点
创建broker数据存储路径

mkdir -p  /docker/rocketmq/data/broker/logs   /docker/rocketmq/data/broker/store /docker/rocketmq/conf

创建配置文件

vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
#设置broker节点所在服务器的ip地址(公网IP)
brokerIP1 = 192.168.52.136

构建broker容器

docker run -d  
--restart=always 
--name rmqbroker 
--link rmqnamesrv:namesrv 
-p 10911:10911 
-p 10909:10909 
-v  /docker/rocketmq/data/broker/logs:/root/logs 
-v  /docker/rocketmq/data/broker/store:/root/store 
-v /docker/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf 
-e "NAMESRV_ADDR=namesrv:9876" 
-e "MAX_POSSIBLE_HEAP=200000000" 
rocketmqinc/rocketmq 
sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf 

3.创建rockermq-console服务
拉取镜像

docker pull pangliang/rocketmq-console-ng

构建rockermq-console容器

docker run -d 
--restart=always 
--name rmqadmin 
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.52.136:9876 
-Dcom.rocketmq.sendMessageWithVIPChannel=false" 
-p 9999:8080 
pangliang/rocketmq-console-ng
IP为内网IP

4.开放端口

5.测试

6.编写代码
引入maven依赖

 
            org.apache.rocketmq
            rocketmq-client
            4.5.1
        

生产者

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        final DefaultMQProducer producer=new DefaultMQProducer("test_producer");
        //这里需要设置NameServer地址
        producer.setNamesrvAddr("101.43.12.115:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
         new Thread(){
             @Override
             public void run() {
                 while (true){
                     try {
                     Message message=new Message("TopicTest","TagA",("Test").getBytes(RemotingHelper.DEFAULT_CHARSET));
                         producer.send(message);
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
         }.start();
        }
        while (true){
            Thread.sleep(1000);
        }
    }
}

消费者

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("test_consumer");
        //这里需要设置NameServer地址
        consumer.setNamesrvAddr("101.43.12.115:9876");
        //订阅Topic,你要消费哪些Topic的消息
        consumer.subscribe("TopicTest","*");
        //这里注册一个回掉接口,去接收获取到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

结果显示

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/721854.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号