|
1,simple(简单消息生产)
public static void onewayProducer() throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("simple_producerGroup_test2");
// 设置NameServer的地址
producer.setNamesrvAddr("10.131.236.179:19300;10.131.236.180:19301");
// 启动Producer实例
producer.start();
for (int i = 0; i < 10; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("simple_topic_test2" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送单向消息,没有任何返回结果
producer.sendoneway(msg);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
public static void SyncProducer() throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("simple_producerGroup_test2");
// 设置NameServer的地址
producer.setNamesrvAddr("10.131.236.179:19300;10.131.236.180:19301");
// 启动Producer实例
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("simple_topic_test2" ,
"TagA" ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 通过sendResult返回消息是否成功送达
System.out.printf("%s%n", sendResult);
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
public static void AsyncProducer () throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("simple_producerGroup_test3");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("simple_topic_test3",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
|
2,filter(过滤消息)
过滤消息有两种方式:Tag 和 SQL 表达式
SQL 基本语法:
RocketMQ 只定义了一些基本语法来支持这个特性,支持扩展。
数值比较,比如 >、>=、<、<=、BETWEEN、=;字符比较,比如 =、<>、IN;IS NULL 或者 IS NOT NULL;逻辑符号 AND、OR、NOT。
常量支持的额类型为:
数值,比如 123、3.1415;字符,比如 ‘abc’,必须用单引号包裹起来;NULL,特殊的常量;布尔值,TRUE 或 FALSE。
只有使用 push 模式的消费者才能使用 SQL92 标准的 SQL 语句
注:消费者sql的使用需要rocketmq开启broker配置项的enablePropertyFilter=true,
默认没开启,如需开启使用前提前找对应开通人处理
生产者代码:
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("filter_producerGroup_test1");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
for(int i=0;i<10;i++) {
Message msg = new Message("filter_topic_test1",
"tagA",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
// System.out.println(sendResult.getMessageQueue());
System.out.println(String.format("SendResult status:%s,brokerName:%s, queueId:%d, queueOffset:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getBrokerName(),sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset()));
}
// 如果不再发送消息,关闭Producer实例。
producer.shutdown(); |
消费者代码:
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_consumerGroup_test1");
// 设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
// 需要配置broker的enablePropertyFilter=true
consumer.subscribe("filter_topic_test1", MessageSelector.bySql("a between 0 and 3"));
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List |
3,batch(批量消息)
3.1 批量发送消息
批量发送消息能显著提高传递小消息的性能,限制是这些批量消息应该有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延时消息,此外,这一批消息的总大小不应超过 4MB。
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("batch_producerGroup_test2");
// 设置NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
String topic = "batch_topic_test2";
//批量处理1--每次批处理不超过4M
List |
消息分割工具类:
public class ListSplitter implements Iterator |
3.2 批量消费消息
控制参数:
consumer.setPullInterval(0); //每次拉取消息间隔时间,默认0,长轮询 consumer.setPullBatchSize(32); //每批次从broker拉取消息的最大个数,默认32 consumer.setConsumeMessageBatchMaxSize(1); //默认值1,单个线程每次消费的条数,值越小越能更大化的利用多线程去执行,假如我们每个消息的处理耗时很长,那么这个参数就应该设置的偏小一点,不要让单个消息的消费慢影响同一批中的其他消息 |
4,order(有序消息)
订单号相同的消息会被先后发送到同一个broker的同一个队列中,保证顺序性
生产代码:
// 订单流程
String[] msgs = new String[] {"创建消息", "付款消息", "推送消息", "完成消息"};
for (int i = 0; i < msgs.length; i++) {
// 创建消息, 指定Topic、Tag、key和消息体
Message msg = new Message("OrderTopic", "Order", "i" + i, msgs[i].getBytes("UTF-8"));
// 发送消息到一个Broker, 参数二: 消息队列的选择器, 参数三: 选择的业务标识
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List |
消费代码:
// 订阅Topic、Tag
consumer.subscribe("OrderTopic", "*");
// 设置回调函数, 处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List |
5,scheduled(延时消息)
应用场景:
比如电商场景,提交了一个订单就可以发送一个延时消息,1h 后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
默认时延配置:
messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";对应1~18,18个等级,1是1s,2是5s...,可通过配置修改时延的时长,可在申请开通时找对应开通人开通
生产者代码:
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("scheduled_producerGroup_test1");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("scheduled_topic_test1", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
// 关闭生产者
producer.shutdown(); |
6,transaction(事务消息)
- 事务消息不支持延时消息和批量消息。为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SEConDS 来改变这个限制,该参数优先于 transactionTimeout 参数。事务性消息可能不止一次被检查或消费。提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
生产者代码:
// 使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,设置自定义线程池来处理这些检查请求。
// 执行本地事务后、需要根据执行结果对消息队列进行回复。
// 回传的事务状态:
// TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
// TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
// TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("transaction_producerGroup_test1");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue |
事务消息监听处理逻辑 :
public class TransactionListenerImpl implements TransactionListener {
// 当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。
// 返回三个事务状态之一。
// checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
// 也是返回三个事务状态之一。
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap |
7,开启消费端的广播模式(默认集群模式)
使用广播模式有两个条件:
1, 创建消费者组的时候开启consumerBroadcastEnable=true
2, 代码里加上
consumer.setMessageModel(MessageModel.BROADCASTING);



