- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
-
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。
- 提供心跳检测机制,检查Broker是否还存活;
-
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
1.下载: Apache Downloadshttps://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
2.解压
注意:解压文件不要放在中文目录下
3配置环境变量
ROCKETMQ_HOME="rocketmq本地路径" NAMESRV_ADDR="localhost:9876"
4.启动
4.1 启动nameserver
Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqnamesrv.cmd’,启动nameserver。成功后如下,此框勿关闭。
4.2
启动broker
Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true’,启动broker。成功后如下,此框勿关闭。
使用:
1.创建maven quickstart项目
2.添加依赖
org.apache.rocketmq rocketmq-client4.9.1
3.简单发送
package com.woniuxy.cloud63.simple;
import com.woniuxy.cloud63.AppConstans;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
import java.util.Scanner;
public class Sender {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("TestSender");
producer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR);
producer.start();
Scanner scanner = new Scanner(System.in);
System.out.println("请输入要发送的消息");
String smsContext = scanner.next();
if(smsContext.equals("exit")){
producer.shutdown();
}
for(int i =0;i<10;i++){
Message msg = new Message(AppConstans.SMS_TOPIC, "user_register", (smsContext+i).getBytes("UTF-8"));
SendResult sendResult = producer.send(msg);
System.out.println("sendResult:"+sendResult);
}
}
}
4.简单接收
package com.woniuxy.cloud63.simple;
import com.woniuxy.cloud63.AppConstans;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
public class Receiver {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");
consumer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR);
consumer.subscribe(AppConstans.SMS_TOPIC,"*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{
for (MessageExt msgExt:msgs){
try {
System.out.println("消息内容:"+new String(msgExt.getBody(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
备注:
package com.woniuxy.cloud63;
public interface AppConstans {
String ROCKETMQ_NAMESERVER_ADDR="localhost:9876";
String SMS_TOPIC="SMS";
}
发送的三种模式:
发送-同步确认发送结果
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
发送-异步确认发送结果
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
- 应用场景:异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
发送-结束 oneway
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
| 发送方式 | 发送TPS | 发送结果反馈 | 可靠性 |
| 同步发送 | 快 | 有 | 不丢失 |
| 异步发送 | 更快 | 有 | 不丢失 |
| 单向发送 | 最快 | 无 | 可能丢失 |



