栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot集成RabbitMQ如何使用多线程进行消费

SpringBoot集成RabbitMQ如何使用多线程进行消费

文章目录
  • 前言
  • 一、安装RabbitMQ
    • 1.安装erlang语言
    • 2.安装MQ
    • 3.可视化界面
  • 二、springboot集成RabbitMq简单使用
    • 1.引入依赖
    • 2.yml配置
    • 3.申明队列
    • 4.发送消息和消息消费
  • 三 rabbitmq高级使用
    • 1.SimpleMessageListenerContainer
    • 2.@RabbitListener
    • 3.手动确认机制
  • 总结


前言

SpringBoot家族对rabbitmq提供了非常好了集成,今天主要是针对springboot中如何使用rabbitmq以及在业务中如何多线程处理消息


一、安装RabbitMQ

老规矩,安装MQ

1.安装erlang语言

https://www.erlang.org/downloads

erlang语言需要设置环境,和jdk环境设置一样

2.安装MQ

https://www.rabbitmq.com/download.html

启动sbin目录下的rabbitmq-server.bat

3.可视化界面

http://localhost:15672

默认账户密码 : guest

二、springboot集成RabbitMq简单使用 1.引入依赖
    
    
      org.springframework.boot
      spring-boot-starter-amqp
    
2.yml配置
spring:
  rabbitmq:
    host: ${RABBIT_MQ_HOST:xxx.xxx.xxx}
    port: ${RABBIT_MQ_PORT:xxxx}
    username: ${RABBIT_MQ_USERNAME:xxxx}
    password: ${RABBIT_MQ_PASSWORD:xxxx}
    virtual-host: /    #类似于mysql的库
    listener:          
      direct:          #容器模型
        acknowledge-mode: MANUAL
      simple:
        acknowledge-mode: MANUAL
        retry:        #重试
          enabled: true
          max-attempts: 3
    publisher-/confirm/is: true   #确认机制
    publisher-returns: true

如果紧使用简单的功能,那么可以省略virtual-host以下的部分

3.申明队列
    //声明队列:针对于转码的队列
    @Bean(QUEUE_WORKQUEUE_TRANSCODE)
    public Queue QUEUE_WORKQUEUE_TRANSCODE() {
        Queue queue = new Queue(QUEUE_WORKQUEUE_TRANSCODE,true,false,false);
        //参数为队列,持久化,排他性,自动删除
        return queue;
    }

使用默认交换机。所以不用申明交换机和绑定

4.发送消息和消息消费
rabbitTemplate.convertAndSend("交换机(不写为默认交换机)", "队列名", 
					"消息体",
					new CorrelationData(aiTask.getId())); // 可省略,确认机制使用

	@RabbitListener(queues = "队列名")
    public void handleMessage(byte[] message){
        System.out.println("消费消息");
        System.out.println(new String(message));
    }
三 rabbitmq高级使用

两种方式,使用container容器,使用注解@RabbitListener指定容器工厂

1.SimpleMessageListenerContainer
@Bean
    public SimpleMessageListenerContainer AiTaskListenerContainer() {
        SimpleMessageListenerContainer container = new  SimpleMessageListenerContainer(cachingConnectionFactory);
        // 监听队列名
        container.setQueueNames(AiMqConfig.QUEUE_WORK_AI_TASK);
        // 当前消费者数量 开启几个线程去处理数据 支持运行时动态修改
        container.setConcurrentConsumers(5);
        // 最大消费者数量  ,  消息堵塞太多的时候,会帮我自动扩展到我的最大消费者数量
        container.setMaxConcurrentConsumers(10);
        // 是否重回队列
        container.setDefaultRequeueRejected(true);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置监听器
        container.setMessageListener(new ChannelAwareMessageListener(){
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                // 消息的唯一性ID deliveryTag:该消息的index 自增长
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                String id  = "";
                try {
                    String msg = message.toString();
                    byte[] messageBody = message.getBody();
                    String s = new String(messageBody);
                    AiMqMessageVo mqMessageVo = JSONObject.parseObject(s, AiMqMessageVo.class);
                    System.out.println("消息: " + msg);
                    System.out.println("消息来自: "+message.getMessageProperties().getConsumerQueue());
                    System.out.println(mqMessageVo);
                    // 同一时刻服务器只会发一条消息给消费者,能者多劳模式
                    channel.basicQos(1);
                    // 手动确认
                    channel.basicAck(deliveryTag, true);
                } catch (Exception e) {
                    // 拒绝策略
                    //basicReject一次只能拒绝接收一个消息,而basicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。
                    channel.basicReject(deliveryTag, false); //是否批量 false
                              System.out.println("签收失败");
                }
            }
        });
        return container;
    }
2.@RabbitListener
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_WORK_AI_TASK = "queue_work_ai_task";
    // 声明交换机
    // 声明队列
	@Bean("AIContainerFactory")
	public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, 
		ConnectionFactory connectionFactory) {
   		SimpleRabbitListenerContainerFactory container= new SimpleRabbitListenerContainerFactory();
   		container.setConcurrentConsumers(5);
   		container.setMaxConcurrentConsumers(10);
  		configurer.configure(factory, connectionFactory);
  		// 是否重回队列
        container.setDefaultRequeueRejected(true);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
   		return factory;
 	}
 }
@Component
public class ReceiveHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(ReceiveHandler.class);
    // 在@RabbitListener注解中指定容器工厂
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_WORK_AI_TASK },
                    containerFactory = "AIContainerFactory")
   //@RabbitListener(queues = {MqConstants.OA_TAB_REFRESH}, concurrency = "10") 也能实现十个并发
    public void aiTask(String msg,Message message,Channel channel){
        LOGGER.info("receive message is:"+msg);
    }
}
3.手动确认机制

在rabbitmqconfig中添加确认机制,所以在发送消息时,要添加消息的id作为唯一标识

//消息发送确认模式  /confirm/iCallback确认模式
    @PostConstruct
    public void initRabbitTemplate(){
        
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                if (!ack){
                    //修改数据库
                    aiTaskService.update(new UpdateWrapper()
                            .eq("id",correlationData.getId())
                            .set("cause",cause)
                            .set("state", AiTask.AI_WORK_FAILURE));
                }
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //修改数据库
                String s = new String(message.getBody());
                AiMqMessageVo aiMqMessageVo = JSONObject.parseObject(s, AiMqMessageVo.class);
                aiTaskService.update(new UpdateWrapper()
                        .eq("id",aiMqMessageVo.getId())
                        .set("cause","未成功投递给指定队列。信息 - > 状态码:"+replyCode + " 文本内容:"+replyText  + "  交换机:"+exchange  + "  路由:"+routingKey)
                        .set("state", AiTask.AI_WORK_FAILURE));
            }
        });
    }
总结

写这些代码已经过去一两个月了,回看一下也有点蒙蔽,有很多不足的地方,代码也是以我目前做的项目来的,一些变量名可以换成你自己的变量名,主要是理清思路。一些难理解的地方,我已经做了详细的注释,不懂可以再次百度。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/581751.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号