栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot环境下RabbitMQ的批量消费(consumer-batch-enabled)

Springboot环境下RabbitMQ的批量消费(consumer-batch-enabled)

概述:

spring-rabbit2.2版本后添加了批量消费, 网上没找到相关的用法,特此记录一下

环境:

jdk1.8 RabbitMQ3.8.3 新建springboot项目

配置:

引入amqp,如下


    org.springframework.boot
    spring-boot-starter-amqp

修改application.yml配置如下

spring:
  rabbitmq:
    host: mymq.net
    virtual-host: /
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 250
        consumer-batch-enabled: true    #开启批量消费
        batch-size: 16                  #每次批量消费大小

指定批处理MessageListenerConverter(这里必须设置)

@Slf4j
@Component
public class BatchConfig {

    @Bean
    public String cusSimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactory containerFactory) {
        log.info("设置批量-----");
        containerFactory.setBatchListener(true);
        return "cusSimpleRabbitListenerContainerFactory";
    }
}

配置消费者

@Slf4j
@Component
public class CallCompleteConsumer {

    @RabbitHandler
    @RabbitListener(queues = "my-consume-queue")
    public void onMessage(List messages, Channel channel) {

        final List messageList = messages.stream()
                .map(each -> new String(each.getBody()))
                .collect(Collectors.toList());
        log.info("[onMessage] {}: {}", messages.size(), messageList);
        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            for (Message message : messages) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
运行:

先创建好对列, 修改参数,启动即可

总结:

    消息批处理比较适合像处理结果批量入库批量操作效率高于单独操作的时候, 如处理结果批量更新到Mysql, 这里说一下ack与Kafka批处理的区别, Kafka批处理ack的时候提交的是offset, 而这里的批处理只针对单个消息的tag, 必须每个都要ack.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698836.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号