1.下载RocketMq镜像文件
docker pull rocketmqinc/rocketmq
2.创建namesrv数据存储路径
mkdir -p /root/dk/rocketMq/data/namesrv/logs /root/dk/rocketMq/data/namesrv/store
3.构建namesrv容器
docker run -d --restart=always --name rmqnamesrv -p 9876:9876 -v /root/dk/rocketMq/data/namesrv/logs:/root/logs -v /root/dk/rocketMq/data/namesrv/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
4.创建broker数据存储路径
mkdir -p /root/dk/rocketMq/data/broker/logs /root/dk/rocketMq/data/broker/store /root/dk/rocketMq/conf
5.创建配置文件
vi /root/dk/rocketMq/conf/broker.conf # 所属集群名称,如果节点较多可以配置多个 brokerClusterName = DefaultCluster #broker名称,master和slave使用相同的名称,表明他们的主从关系 brokerName = broker-a #0表示Master,大于0表示不同的slave brokerId = 0 #表示几点做消息删除动作,默认是凌晨4点 deleteWhen = 04 #在磁盘上保留消息的时长,单位是小时 fileReservedTime = 48 #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制; brokerRole = ASYNC_MASTER #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要; flushDiskType = ASYNC_FLUSH # 设置broker节点所在服务器的ip地址 brokerIP1 = 10.160.32.28
6.构建broker容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v /docker/rocketmq/data/broker/logs:/root/logs -v /root/dk/rocketMq/data/broker/store:/root/store -v /root/dk/rocketMq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
7.拉取rocketMq-console服务
docker pull pangliang/rocketmq-console-ng
8.构建rocketmq-console容器
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.160.32.28:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 9999:8080 pangliang/rocketmq-console-ng
9.创建Topic
sh mqadmin updateTopic -n 10.160.32.28:9876 -b localhost:10911 -t DkTestTopic1
问题:unable to calculate a request signature. error=Algorithm HmacSHA1 not available
解决方法:
/opt/rocketmq-4.4.0/bin/tools.sh文中JAVA_OPT末尾加上jre的具体路径,jre的路径可以利用find / -type d -name jre查找
10.创建订阅组
sh mqadmin updateSubGroup -c DefaultCluster -g TopicTestGroup -n 10.160.32.28:9876 -b localhost:10911
11.生产者发送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
12.消费者接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
13.删除Topic
sh mqadmin deleteTopic -t TopicTest -c DefaultCluster -n 10.160.32.28:9876
14.删除订阅组
sh mqadmin deleteSubGroup -g topicTestGroup -c DefaultCluster -n 10.160.32.28:9876
java验证发送接收消息:
public static void produces() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr("10.160.32.28:9876");
producer.start();
for (int i = 0; i < 5; i++) {
//1.创建消息
Message message = new Message(
"DkTestTopic1",//主题
"TagA",//标签
"key" + i,//用户自定义的key,唯一的标识
("Hello RocketMQ测试消息-=-========" + i).getBytes()//消息内容实体(byte[])
);
//2.发送消息
SendResult sr = producer.send(message);
System.out.println("消息发出: " + sr);
}
producer.shutdown();
}
public static void consumers() throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
// Specify name server addresses.
consumer.setNamesrvAddr("10.160.32.28:9876");
// Subscribe one more topics to consume. *表示订阅所有tags
consumer.subscribe("DkTestTopic1", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
9876端口在外部不能访问时查看
sysctl net.ipv4.ip_forward
值为0时需要在/etc/sysctl.conf配置文件中将其修改为1
查看防火墙是否打开
# 防火墙状态查看 systemctl status firewalld # 防火墙关闭 systemctl stop firewalld.service # 添加端口到防火墙(--permanent表示永久生效,没有此参数重启后失效) firewall-cmd --zone=public --add-port=9876/tcp --permanent # 重新加载 firewall-cmd --reload # 查看端口添加是否成功 firewall-cmd --zone=public --query-port=9876/tcp # 删除端口 firewall-cmd --zone=public --remove-port=9876/tcp --permanent



