栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq集成与使用

Rabbitmq集成与使用

Springboot集成rabbitmq pom.xml 依赖
		
            org.springframework.boot
            spring-boot-starter-amqp
            2.2.5.RELEASE
        
application.yml

通用配置,实际根据需要添加修改

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    # 重试次数,默认为3次
    listener:
      simple:
        retry:
          enabled: true
          max-attempts: 5
        # 手动ack
        acknowledge-mode: manual
    # ack
    publisher-/confirm/i-type: correlated
    # 发送失败返回
    publisher-returns: true
DirectRabbitConfig

队列配置类,以direct模式为例,其他模式类似。定义queue,定义exchange,绑定queue与exchange。

生产者,交换机,多个消息队列,多个消费者

package cloud.lcx.learn.common.config.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class DirectRabbitConfig {

    public static  final  String MY_DIRECT_QUEUE = "MyDirectQueue";
    public static  final  String MY_DIRECT_EXCHANGE = "MyDirectExchange";
    public static  final  String ROUTINGKEY = "hadoop";


    // direct queue
    @Bean
    public Queue myDirectQueue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue(MY_DIRECT_QUEUE,true,true,false);
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    DirectExchange myDirectExchange() {
        return new DirectExchange(MY_DIRECT_EXCHANGE,true,false);
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(myDirectQueue()).to(myDirectExchange()).with(ROUTINGKEY);
    }

}
  • direct 如果路由键完全匹配的话,消息才会被投放到相应的队列,应用最多。

  • fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

  • topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符,应用较多。

  • header 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器

这里说的是消息怎么从交换机(exchange)到队列(queue)的过程,按以上的规则进行。而消息怎么从队列中被消费是竞争的,比如说在应用程序多节点部署情况下,会存在多个节点监听某个队列,一般情况下消息只会被消费一次,默认情况下是轮询的。

MqAckConfig

全局ACK配置,配置消息确认消费处理器,配置消息发送失败处理器。(非必须)

package cloud.lcx.learn.common.config.mq;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;


@Configuration
public class MqAckConfig {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //初始化加载方法,对RabbitTemplate进行配置
    @PostConstruct
    void rabbitTemplate(){
        //消息发送确认,发送到交换器Exchange后触发回调
        rabbitTemplate.set/confirm/iCallback(new ReturnCallBackHandler());
        //消息发送确认,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
        rabbitTemplate.setReturnCallback(new ReturnHandler());
        //自定义格式转换
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 设置为自动提交,即使配置文件添加了配置
        factory.setConnectionFactory(connectionFactory);
        // 手动设置手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}
ConfirmCallback

定义消息确认消费处理器(非必须)

package cloud.lcx.learn.common.config.mq;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;


@Slf4j
public class  ReturnCallBackHandler implements RabbitTemplate./confirm/iCallback {
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            log.info("rabbitMq Ack : {} recved",correlationData);
        }else{
            log.info("rabbitMq Ack : {} unrecved,cause {}",correlationData,cause);
        }
    }
}

这此处处理消费者Ack或Nack的回调逻辑。

ReturnCallback

定义消息发送失败处理器(非必须)

package cloud.lcx.learn.common.config.mq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;


@Slf4j
public class  ReturnHandler implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("rabbitmq : {} returned in {} with {},{}",message,exchange,replyCode,replyText);
    }
}

与/confirm/iCallback区别在于,此处一般处理的是事消息发送异常事件,消息未曾到达消费者那边。

MqMessage

可以定义一个统一的消息格式(非必须)

package cloud.lcx.learn.common.config.mq.model;

import lombok.Data;


@Data
public class MqMessage {
    String appId;
    String msgBody;
}

生产者发送消息

使用RabbitTemplate发送消息,和redis类似

//使用RabbitTemplate,这提供了接收/发送等等方法
@Autowired
RabbitTemplate rabbitTemplate;
MqMessage message = new MqMessage();
message.setAppId("rabbitMq application");
message.setMsgBody("hello rabbit!");
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange,指定消息唯一ID
rabbitTemplate.convertAndSend(DirectRabbitConfig.MY_DIRECT_EXCHANGE, DirectRabbitConfig.ROUTINGKEY, message,new CorrelationData(UUID.randomUUID().toString()) );
消费者消费消息
@Component
@Slf4j
public class RabbitDirectReceiver {

    

    @RabbitListener(queues = DirectRabbitConfig.MY_DIRECT_QUEUE, containerFactory="rabbitListenerContainerFactory")
    public void process(Message message, Channel channel) throws IOException {
        try {
            // 业务代码
            // .......
            // multiple: false 只确认当前消费者已消费消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            // multiple: true  确认改消息已被所有消费者消费
            //channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        }catch (Exception e){
            log.error("消息处理异常!",e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
        }
    }
}

一般情况下,为防止消息人为丢失(代码错误导致消息消费异常而引起消息丢失),会开启手动消费确认,在客户端在取出消息处理完成后,手动回复确认是否消费成功,api如下:

成功消费

 // multiple: false 只确认当前消费者已消费消息
 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

消费失败

// 消费失败,消息回到队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
// 消费失败,删除消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);

multiple批量确认,我用的比较少,一般都是只确认自己节点,可能在一些特殊场景下会用。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/422921.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号