- 入门案例
- NameServer 地址
- 发送消息
- 同步发送消息
- 异步发送消息
- 一次性发送消息
- 生产者组、消息封装
- 接收消息
- 消费方式:推式消费、拉式消费
- 消息方式:集群模式、广播模式
- 消息消费位置
- 消息确认
入门案例Java 从 0 到架构师目录:【Java从0到架构师】学习记录
org.apache.rocketmq rocketmq-client 4.7.0
生产者:
public class ProducerDemo {
public static void main(String[] args) throws Exception {
//1 创建一个生产者
DefaultMQProducer producer = new DefaultMQProducer("maoge");
//2 指定NameServer的地址
producer.setNamesrvAddr("192.168.52.128:9876");
//3 启动生产者
producer.start();
for (int i = 0; i < 100; i++) {
// 4 创建消息对象
Message msg = new Message("01_hello", ("hello" + i).getBytes());
// 5 发送消息
SendResult result = producer.send(msg);
System.out.println(result.getSendStatus());
System.out.println("result.getMsgId() = " + result.getMsgId());
}
//6 关闭应用程序
producer.shutdown();
}
}
消费者:
public class ConsumerDemo {
public static void main(String[] args) throws MQClientException {
//1 创建一个消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("maoge");
// 设置NameServer的地址, 通过配置环境变量、JVM参数, 可不写死
consumer.setNamesrvAddr("192.168.52.128:9876");
//2 指定消费的主题
consumer.subscribe("01_hello","*");
//3 指定从哪个位置开始消费(只是对第一次启动消费的时候有效果)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//4 指定一个监听器, 并发消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(Thread.currentThread().getName() + "收到消息:" + new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5 启动消费者
consumer.start();
}
}
NameServer 地址
在生产者或消费者找 NameServer 的地址不能写死(代码要在多环境运行),以下方式可以取代在代码中写死 consumer.setNamesrvAddr("192.168.52.128:9876");
- 方法一:配置环境变量,NAMESRV_ADDR 的值为 IP地址
- 方法二:通过 JVM 参数传递
同步,指的是在发送数据到消息中间件的时候,需要及时的返回一个结果到发送者
- 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知
在异步发送消息的时候,将数据发送到消息中心,此时程序不会等待消息中心的返回结果,而是继续往下执行,当有结果返回的时候,通过异步通知的方式告诉消息生产者
- 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应
public class ProducerDemo {
public static void main(String[] args) throws Exception {
//1 创建一个生成者对象
DefaultMQProducer producer = new DefaultMQProducer("async");
//2 设置NameServer的地址
// producer.setNamesrvAddr("192.168.52.128:9876");
//3 启动生产者
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
//4 创建消息Message
//5 发送消息
//创建一个计数器
final CountDownLatch2 countDownLatch = new CountDownLatch2(5);
for (int i = 0; i < 5; i++) {
Message msg = new Message("02_async", "hello, NBA".getBytes());
try {
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(Thread.currentThread().getName());
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功" + JSON.toJSONString(sendResult));
}
countDownLatch.countDown();
}
//异常处理
@Override
public void onException(Throwable e) {
e.printStackTrace();
countDownLatch.countDown();
}
});
} catch (MQClientException e) {
//补救措施 把对应的消息先保存到另外一个地方 MySQL, 自己到时候重新触发 发送消息
e.printStackTrace();
} catch (RemotingException e) {
//补救措施
e.printStackTrace();
} catch (InterruptedException e) {
//补救措施
e.printStackTrace();
}
}
// 如果使用异步操作, 需要等待接收完所有的异步返回结果之后, 再去关闭主线程
countDownLatch.await();// 等待计数器归0
//6 关闭生产者
producer.shutdown();
}
}
一次性发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送
public class ProducerDemo {
public static void main(String[] args) throws Exception {
//1 创建一个生成者对象
DefaultMQProducer producer = new DefaultMQProducer("03_oneway");
//2 设置NameServer的地址
// producer.setNamesrvAddr("192.168.52.128:9876");
//3 启动生产者
producer.start();
//4 创建消息Message
//5 发送消息
//创建一个计数器
for (int i = 0; i < 5; i++) {
// Tag: 消息标签, 用来进行消息分类 支付消息--> wx支付,zfb支付
// Key: 业务消息 订单消息: key: 订单id/订单号
Message msg = new Message("03_oneway", "TagB", i + "", "hello, NBA".getBytes());
producer.sendOneway(msg);//日志, 大数据应用
}
//6 关闭生产者
producer.shutdown();
}
}
生产者组、消息封装
生产者组:
同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消息封装:
- Tag:用来给消息进行标记
通过 Tag 对消息进行分类,把不同类型的消息交给不同的消费者进行消费 - Key:可以设置消息的一个唯一ID,用于区分每个消息的标志、业务ID
在管理控制台,可以通过 Key 进行消息的查询跟踪
推式消费:消息中心主动推送消息给消费者消费(需要绑定推送事件)
public class PushConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("type");
consumer.setNamesrvAddr("192.168.52.128:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题, 后面可以指定表达式Tag, 根据tag进行消息过滤
consumer.subscribe("01_hello","*");
//注册消息消费监听 有消息就会触发consumeMessage方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
System.out.println("接收消息线程: "+Thread.currentThread().getName());
for (MessageExt msg : msgs) {
System.out.println("接收到的消息:" + new String(msg.getBody()));
}
//确定消息消费从
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
}
}
拉式消费:消费者主动去消费中心拉取消息(默认 5 秒拉取 1 次)
public class PullConsumerDemo {
public static void main(String[] args) throws Exception {
//创建一个拉取消息的消费者对象
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("pullType2");
//设置对应的NameServer地址
consumer.setNamesrvAddr("192.168.52.128:9876");
//设置从那里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅对应的Topic
consumer.subscribe("01_hello","*");
//启动消费者
consumer.start();
while (true) {
//开始获取消息
List msgs = consumer.poll();
//处理消息结果
for (MessageExt msg : msgs) {
System.out.println("接收到的消息:" + new String(msg.getBody()));
}
}
}
}
消息方式:集群模式、广播模式
集群模式:对于同一个消费者组里面的多个消费者,每个消费者消费的消息都是不一样的,相当于消费者的负载均衡
设置消费者的模式:
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式
广播模式:每个消费者都会接受全部的消息,所有消费者消费的数据都是一样的
- 一般用于对于消息需要多个其他业务进行处理
设置消费者的模式:
consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式消息消费位置
在指定消费的 pos 位置的时候,会优先获取服务端记录的上次消费点,所以该参数只有在服务端没有对应的消费者的记录的时候有效,一般情况是第一次启动的消费者有效。
- CONSUME_FROM_FIRST_OFFSET:从最开始的位置消费,会消费该 Topic 下面所有的有效的数据,过期的数据会删除掉
- CONSUME_FROM_LAST_OFFSET:
1 如果该 Topic 的数据都是最近的数据,没有过期数据,则从最开始的位置消费
2 如果该 Topic 的数据存在过期的数据,则从最后的位置开始消费,只会消费新加入的数据 - CONSUME_FROM_TIMESTAMP
根据指定的时间戳进行消费,配合 consumer.setConsumeTimestamp("20200612083300"); 从指定的时间开始消费;如果不指定,则默认从半个小时前的数据开始消费
拉式消费:可以通过 consumer.setAutoCommit(false); 设置是否自动提交;
- 如果设置为手动提交,需要使用 consumer.commitSync(); 方法进行手动提交
- 对于未提交的操作,Topic 中的订阅的偏移量不会发生改变,下次消费的时候会继续消费改数据
推式消费:
- 返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态表示消费成功
- 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 表示消费不成功,会放入到重试队列
- 默认重试采用服务端重试(重试次数 16 次)



