4.7.0版本引入2.1.0版本,4.8.0引入2.2.0
|
生产者配置:
#消息轨迹功能,如果开启了acl控制 #在给生产者和消费者分配用户权限时需要额外分配-i PUB(defaultTopicPerm=PUB),否则默认的RMQ_SYS_TRACE_TOPIC无权限 #如果需要指定自定义消息轨迹topic,需要提前申请创建对应的topic,默认自动创建被禁用 rocketmq.producer.customized-trace-topic=RMQ_SYS_TRACE_TOPIC rocketmq.producer.enable-msg-trace=true #rocketmq连接地址 rocketmq.name-server=127.0.0.1:9876 #生产者组自定义即可 rocketmq.producer.group=my-group #集群关闭自动创建topic的话topic无法自动创建,需要提前通过管理台创建 demo.rocketmq.topic=test #配置acl用户名密码 rocketmq.producer.access-key=rocketmq02 rocketmq.producer.secret-key=12345678 #其他的NameServer地址,对于需要操作多数据源的项目配置不同于rocketmq.name-server的值即可 demo.rocketmq.extNameServer=127.0.0.1:9876 ### 性能优化 ##生产者发送消息超时时间,默认3s rocketmq.producer.send-message-timeout=15000 ##生产者消息压缩大小,默认达到4KB启用消息压缩 rocketmq.producer.compress-message-body-threshold=4096 ##生产者发送消息最大字节数,默认4MB rocketmq.producer.max-message-size=4194304 ##生产者发送异步消息失败后重试次数,默认0次 rocketmq.producer.retry-times-when-send-async-failed=0 ##生产者消息失败容错策略,默认false不开启,生产环境建议开启 rocketmq.producer.retry-next-server=true ##生产者发送同步消息失败后重试次数,默认2次 rocketmq.producer.retry-times-when-send-failed=2 |
消费者配置:
#消费端配置 #Push模式 #demo.rocketmq.topic=test5 demo.rocketmq.consumerGroup=test5_group rocketmq.consumer.access-key=rocketmq06 rocketmq.consumer.secret-key=12345678 #性能优化 Push模式 #设置demo.rocketmq.selector-expression=order只消费tag=order的数据,消息过滤表达式 demo.rocketmq.selector-expression=* demo.rocketmq.selector-type=TAG demo.rocketmq.message-model=MessageModel.CLUSTERING #pull消费模式配置,pull模式只支持2.2.0以上的版本 #rocketmq.consumer.group=test1_group #rocketmq.consumer.topic=test1 #rocketmq.consumer.access-key=rocketmq2 #rocketmq.consumer.secret-key=12345678 #性能优化 Pull模式下 #rocketmq.consumer.selector-expression=* #rocketmq.consumer.pull-batch-size=10 #rocketmq.consumer.message-model=CLUSTERING #TAG or SelectorType.SQL92 #rocketmq.consumer.selector-type=TAG |
生产者代码:
@SpringBootTest
class DemoApplicationTests {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
@Value("${demo.rocketmq.topic}")
private String springTopic;
@Test
void SimpleonewayProducer() throws Exception {
rocketMQTemplate.sendoneWay(springTopic, "Hello, World!");
}
@Test
void SimpleSyncProducer() throws Exception {
SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
System.out.printf("syncSend1 to topic %s sendResult=%s %n", springTopic, sendResult);
}
@Test
void SimpleAsyncProducer() throws Exception {
rocketMQTemplate.asyncSend(springTopic, "Hello, World!", new SendCallback() {
@Override
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
@Override
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
}
@Test
void ScheduledSyncProducer() throws Exception {
String replyString = rocketMQTemplate.syncSend(springTopic,MessageBuilder.withPayload("request string").build(),15000,3);
System.out.printf("send %s and receive %s %n", "request string", replyString);
}
@Test
void OrderProducer() throws Exception {
rocketMQTemplate.setMessageQueueSelector((List |
消费者代码:
1,push 模式
a,默认消费
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorexpression = "${demo.rocketmq.tag}")
public class StringConsumer implements RocketMQListener |
b,配置消费端消费起始位点
需要实现RocketMQPushConsumerLifecycleListener接口,在实现方法prepareStart里配置属性
代码如下:
@Service
@RocketMQMessageListener(
topic = "${demo.rocketmq.topic}",
consumerGroup = "${demo.rocketmq.consumerGroup}",
selectorexpression = "${demo.rocketmq.selector-expression}",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING
// accessKey = "rocketmq02", // It will read by `rocketmq.consumer.access-key` key
// secretKey = "12345678" // It will read by `rocketmq.consumer.secret-key` key
)
public class ACLStringPushConsumer implements RocketMQListener |
c,修改消费重试的时间间隔以及自定义是否进入死信队列配置
rocketmq默认消息发送16次后数据会自动进入死信队列,每次重试的偏移量会在重试队列记录,修改重试的时间间隔以及自定义是否直接进入死信队列这些配置再springboot中暂时没有提供可用的接口,官方建议使用默认的方式,
如果有此业务需求,可以通过支持原生Listener的使用方式自己控制ConsumeConcurrentlyStatus实现,具体实现代码如下
@Service
@RocketMQMessageListener(
topic = "${demo.rocketmq.topic}",
consumerGroup = "${demo.rocketmq.consumerGroup}",
selectorexpression = "${demo.rocketmq.selector-expression}",
selectorType = SelectorType.TAG,
messageModel = MessageModel.CLUSTERING
// accessKey = "znb00004", // It will read by `rocketmq.consumer.access-key` key
// secretKey = "12345678" // It will read by `rocketmq.consumer.secret-key` key
)
public class ACLStringPushConsumer implements RocketMQPushConsumerLifecycleListener {
@Autowired
private MyMessageListenerConcurrently myMyMessageListenerConcurrently;
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
//设置从当前时间开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
System.out.println(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
//设置最大重试次数.默认16次
consumer.setMaxReconsumeTimes(10);
//配置重试消息逻辑,默认是context.setDelayLevelWhenNextConsume(0);
consumer.setMessageListener(myMyMessageListenerConcurrently);
}
} |
自定义消息监听器MyMessageListenerConcurrently实现
@Service
public class MyMessageListenerConcurrently implements MessageListenerConcurrently {
private static final Logger log = LoggerFactory.getLogger(MyMessageListenerConcurrently.class);
@Override
public ConsumeConcurrentlyStatus consumeMessage(List |
注:springboot批量消费consumeMessageBatchMaxSize配置了无效,默认每次都是单条串行消费的,如果对消费速率有需求可以使用pull模式或者原生rocketmq进行批量消费
2,pull模式
@Service
public class ACLStringPullConsumer implements CommandLineRunner {
public static volatile boolean running = true;
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void run(String... args) throws Exception {
// List |
3,等待回复的消费
String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class, 3000);
生产者sendAndReceive对应的消费代码:
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.stringRequestTopic}", consumerGroup = "${demo.rocketmq.stringRequestConsumer}", selectorexpression = "${demo.rocketmq.tag}")
public class StringConsumerWithReplyString implements RocketMQReplyListener |
4,其他数据源消费
@Service
@RocketMQMessageListener(topic = "${demo.rocketmq.msgExtTopic}", selectorexpression = "tag0||tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener |
如何声明不同name-server或者其他特定的属性来定义非标的RocketMQTemplate?
第一步: 定义非标的RocketMQTemplate使用你需要的属性,可以定义与标准的RocketMQTemplate不同的nameserver、groupname等。如果不定义,它们取全局的配置属性值或默认值。
// 这个RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 与所定义的类名相同(但首字母小写)
@ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
, ... // 定义其他属性,如果有必要。
)
public class ExtRocketMQTemplate extends RocketMQTemplate {
//类里面不需要做任何修改
}
第二步: 使用这个非标RocketMQTemplate
@Resource(name = "extRocketMQTemplate") // 这里必须定义name属性来指向上述具体的Spring Bean.
private RocketMQTemplate extRocketMQTemplate;
接下来就可以正常使用这个extRocketMQTemplate了。



