- 安装与启动roketmq
- 使用RocketMQ原生API收发消息
安装roketmq
启动name server命令:
# 进入 rocketmq 目录 cd /usr/local/rocketmq/ # 启动 name server nohup sh bin/mqnamesrv & # 查看运行日志, 看到"The Name Server boot success."表示启动成功 tail -f ~/logs/rocketmqlogs/namesrv.log
启动broker命令:
# 启动 broker, 连接name server: localhost:9876 nohup sh bin/mqbroker -n localhost:9876 & # 查看运行日志, 看到"The broker[......:10911] boot success."表示启动成功 tail -f ~/logs/rocketmqlogs/broker.log
启动管理界面:
在root目录下执行
nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876 &
访问http://192.168.64.141:8080
使用RocketMQ原生API收发消息依赖:
org.apache.rocketmq rocketmq-client 4.7.1 org.apache.rocketmq rocketmq-store 4.7.1
生产者:
public class Producer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// 新建生产者实例
DefaultMQProducer p = new DefaultMQProducer("p1");
// 设置 name server 地址
p.setNamesrvAddr("192.168.64.141:9876");
// 启动 (连接服务器)
p.start();
while (true) {
System.out.print("输入消息:");
String s = new Scanner(System.in).nextLine();
// 把消息封装到 Message 对象
// import org.apache.rocketmq.common.message.Message;
// Topic 相当于是一级分类
// Tag 相当于是二级分类
Message msg = new Message("Topic1", "Tag1", s.getBytes());
msg.setDelayTimeLevel(3);//十秒后向消费者投递
// 发送消息
SendResult r = p.send(msg);
// 打印服务器的反馈信息
System.out.println(r);
}
}
}
消费者:
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 新建消费者实例
DefaultMQPushConsumer c = new DefaultMQPushConsumer("c1");
// 设置 name server
c.setNamesrvAddr("192.168.64.141:9876");
// 订阅消息
// * - 所有标签
// Tag1 || Tag2 || Tag3 - 多种标签
// Tag1 - 一种标签
c.subscribe("Topic1", "Tag1");
// 设置处理消息的监听器
// Concurrently - 多个线程并发接收处理消息
c.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String s = new String(msg.getBody());
System.out.println("收到:"+s);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动
c.start();
}
}



