栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SpringBoot中 RabbitMQ 动态添加Queue和Listener

SpringBoot中 RabbitMQ 动态添加Queue和Listener

有这样一个需求,在程序运行过程中,需要动态添加Queue,每个Queue需要动态添加Listener,Google了半天,发现大都实现的是让已存在的Listener去增加监听Queue,而不是动态增加Listener。

于是扒了下spring amqp的源代码,从RabbitListener找到RabbitListenerAnnotationBeanPostProcessor,再找到RabbitListenerEndpointRegistry,发现RabbitListenerEndpointRegistry可以注册Listener,如下:

public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory factory,
				boolean startImmediately)

便试了下,果然好使,如何动态添加Queue,网上的文章有很多,这里不多做描述了,下面主要说明如何动态添加Listener设置:

@Configuration
public class MyMqConfig implements RabbitListenerConfigurer {
  @Bean
  public RabbitListenerErrorHandler rabbitListenerErrorHandler() {
      return (amqpMessage, message, exception) -> {
          exception.printStackTrace();
          throw exception;
      };
  }

  @Bean
  public ConnectionFactory connectionFactory() {
      return this.connectionFactory(rabbitMqProperties.getHostname(), rabbitMqProperties.getUsername(), rabbitMqProperties.getPasswd()
              , rabbitMqProperties.getPort(), rabbitMqProperties.getEnv());
  }

  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrentConsumers(rabbitMqProperties.getConcurrentConsumers());
      factory.setMaxConcurrentConsumers(rabbitMqProperties.getMaxConcurrentConsumers());
      factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
      factory.setAdviceChain(retryTemplate());
      factory.setPrefetchCount(rabbitMqProperties.getPrefetchCount());
      return factory;
  }

  @Bean
  public MessageHandlerMethodFactory messageHandlerMethodFactory() {
      DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
      messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
      return messageHandlerMethodFactory;
  }

  @Bean
  public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
      return new MappingJackson2MessageConverter();
  }

  @Override
  public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
      registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
  }

  // 其他配置忽略

}
@Component
public class MyQueueListener {

  // 这个方法用来处理消息,具体消息来自哪个queue,可以从参数queue中获取到
  public void testQueues(String message, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        logger.debug("message..." + message + "...queue..." + queue);
        // 处理消息
    }

}
@Service
public class MyListenerService {
  
    @Autowired
    private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;

    @Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @Autowired
    private RabbitListenerErrorHandler rabbitListenerErrorHandler;

    @Autowired
    private MessageHandlerMethodFactory messageHandlerMethodFactory;

    @Autowired
    private MyQueueListener myQueueListener;

    // 添加Listener
    public void addListener(String queueName) {
      Method method = null;
      try {
          method = MyQueueListener.class.getMethod("testQueues", String.class, String.class);
      } catch (NoSuchMethodException e) {
          e.printStackTrace();
      }
      MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
      endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
      endpoint.setBean(myQueueListener);
      endpoint.setId(queueName);
      endpoint.setMethod(method);
      endpoint.setQueueNames(queueName);
      endpoint.setErrorHandler(rabbitListenerErrorHandler);
      rabbitListenerEndpointRegistry.registerListenerContainer(endpoint, rabbitListenerContainerFactory, true);
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433854.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号