栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitmq基础

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitmq基础

安装和配置的过程省略。
简单的使用:
1.创建配置类(消费者和生产者都需要):

@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);//表明死信交换机那个routerKey 是我要发送没用消息的队列
        
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY).build();
    }

    //创建业务队列A
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);//绑定要发送死信到哪个交换机
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);//表明死信交换机那个routerKey 是我要发送没用消息的队列
        
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY).build();
    }

    //创建死信队列A
    @Bean("deadQueueA")
    public Queue deadQueueA(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME).build();
    }

    //创建死信队列B
    @Bean("deadQueueB")
    public Queue deadQueueB(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME).build();
    }

    //绑定业务交换机和队列 发布订阅模式不需要routerkey
    @Bean
    public Binding businessBindingA(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange);
    }

    
    //绑定业务交换机和队列
    @Bean
    public Binding businessBindingB(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

版本自己决定
		
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
生产者:
spring:
  application:
    name: nacos-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false #被拒绝后是否重新入队
        acknowledge-mode: manual #手动应答

消息发送的类:

@Component
public class BusinessMessageSender {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
======================================================================================
发送消息接口
@RequestMapping("rabbitmq")
@RestController
public class RabbitMqController {

    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg")
    public String sendMsg(String msg){
        System.out.println(msg);
        sender.sendMsg(msg);
        return msg;
    }
}
消费者A:
spring:
  application:
    name: nacos-provider-9001
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        default-requeue-rejected: false #被拒绝后是否重新入队
        acknowledge-mode: manual #手动应答
配置类:
@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEA_ROUTING_KEY);//表明死信交换机那个routerKey对应的队列 是我要发送没用消息的队列
        
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEA_ROUTING_KEY).build();
    }

    //创建死信队列A
    @Bean("deadQueueA")
    public Queue deadQueueA(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEA_NAME).build();
    }


    //绑定业务交换机和队列 发布订阅模式不需要routerkey
    @Bean
    public Binding businessBindingA(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadExchange") DirectExchange exchange,
                                      @Qualifier("deadQueueA") Queue queueA){
        return BindingBuilder.bind(queueA).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
}

业务队列A的消息接收类:

@Component
@Slf4j
public class BunsinessMessageReceive {

    @RabbitListener(queues = {RabbitMqConfig.BUSINESS_QUEUEA_NAME})
    public void ReceiveA(Message message, Channel channel) throws Exception{
        String msg=new String(message.getBody());
        log.info("接收到消息A: "+msg);
        boolean ack=true;//是否应答
        Exception exception=null;
        try {
            if (msg.contains("deadletter")){
                throw new IllegalArgumentException("dead letter exception");
            }
        }catch (Exception e){
            ack=false;//拒绝应答
            exception=e;
        }
        if(ack){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else {
            log.error("消息发生异常 , error message : "+exception.getMessage());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//拒绝应答
        }
    }
}

业务队列A对应的死信队列A消息接收代码:

@Component
public class DeadLetterMessageReceive {

    //死信队列A消息的收到
    @RabbitListener(queues = {DEAD_LETTER_QUEUEA_NAME})
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

消费者B:

@Configuration
public class RabbitMqConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    //创建业务交换机
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);//扇出交换机 发布订阅
    }

    //创建死信交换机
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);//直接相连的交换机 只有 routerKey相同才会通信
    }

    //创建业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map map=new HashMap<>();
        map.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);//绑定要发送死信到哪个交换机
        map.put("x-dead-letter-routing-key",DEAD_LETTER_QUEUEB_ROUTING_KEY);//表明死信交换机那个routerKey对应的队列 是我要发送没用消息的队列
        //两种方式绑定  本质上都是一样的
        
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE)
                .deadLetterRoutingKey(DEAD_LETTER_QUEUEB_ROUTING_KEY).build();
    }

    //创建死信队列B
    @Bean("deadQueueB")
    public Queue deadQueueB(){
        return QueueBuilder.durable(DEAD_LETTER_QUEUEB_NAME).build();
    }

    //绑定业务交换机和队列
    @Bean
    public Binding businessBindingB(@Qualifier("businessExchange") FanoutExchange exchange,
                                    @Qualifier("businessQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange);
    }

    //绑定死信队列和它的交换机
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadExchange") DirectExchange exchange,
                                                                        @Qualifier("deadQueueB") Queue queueB){
        return BindingBuilder.bind(queueB).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

业务队列B的消息接收类:

//接收消息
@Component
@Slf4j
public class BunsinessMessageReceive {

    @RabbitListener(queues = {RabbitMqConfig.BUSINESS_QUEUEB_NAME})
    public void ReceiveA(Message message, Channel channel) throws Exception{
        String msg=new String(message.getBody());
        log.info("接收到消息B: "+msg);
        boolean ack=true;//是否应答
        Exception exception=null;
        try {
            if (msg.contains("deadletter")){
                throw new IllegalArgumentException("dead letter exception");
            }
        }catch (Exception e){
            ack=false;//拒绝应答
            exception=e;
        }
        if(ack){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else {
            log.error("消息发生异常 , error message : "+exception.getMessage());
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//拒绝应答
        }
    }
}

业务队列A对应的死信队列A消息接收代码:

@Component
public class DeadLetterMessageReceive {

    //死信队列A消息的收到
    @RabbitListener(queues = {RabbitMqConfig.DEAD_LETTER_QUEUEB_NAME})
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

访问接口:http://localhost:8080/rabbitmq/sendmsg?msg=msg

再看这个:http://localhost:8080/rabbitmq/sendmsg?msg=deadletter
死信队列也同时处理成功了

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/873874.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号