栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

springBoot RabbitMq流量削峰,可靠性传递

springBoot RabbitMq流量削峰,可靠性传递

导入依赖

    
        
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.projectlombok
            lombok
        

        
            com.fasterxml.jackson.core
            jackson-databind
            2.9.9
        

        
            org.springframework.boot
            spring-boot-starter-amqp
        
    

消息消费者
配置文件

spring:
  rabbitmq:
    host: 192.168.200.130 # ip
    port: 5672
    username: admin
    password: admin
    virtual-host: /itcast
    listener:
      type: simple
      simple:
        prefetch: 1 #消费者每次从队列获取的消息数量
        concurrency: 2 #消费者数量
        max-concurrency: # 启动消费者最大数量


server:
  port: 8001

生产消息

    @Test
    public void testSend(){
        for (int i=0;i<10;i++){
            rabbitTemplate.convertAndSend(RabbitMQConfig1.QUEUE_NAME,"这是普通模式");
        }
    }

接收消息

@Component
public class RabbimtMQListener {

    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message){
        System.out.println("消费者1"+new String(message.getBody())+System.currentTimeMillis());
         模拟处理需要1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

结果打印

可靠性传递

配置文件

spring:
  rabbitmq:
    host: 192.168.200.130 # ip
    port: 5672
    username: admin
    password: admin
    virtual-host: /itcast
    listener:
      type: simple
      simple:
        prefetch: 1 #消费者每次从队列获取的消息数量
        concurrency: 2 #消费者数量
        max-concurrency: # 启动消费者最大数量
    publisher-/confirm/is: true #确认消息已发送到交换机(Exchange) 可以把publisher-/confirm/is: true 替换为  publisher-/confirm/i-type: correlated
    publisher-returns: true #确认消息已发送到队列(Queue)

server:
  port: 8001

配置文件

@Configuration
public class RabbitMQConfig1 {

    public static final String QUEUE_NAME = "boot_queue";

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
        //我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
        rabbitTemplate.setMandatory(true);

        //消息发送到交换机,是否成功收到消息,true成功,false失败
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            @Override
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println();
                System.out.println("相关数据:" + correlationData);
                if (ack) {
                    System.out.println("投递成功,确认情况:" + ack);
                } else {
                    System.out.println("投递失败,确认情况:" + ack);
                    System.out.println("原因:" + cause);
                }
            }
        });

        //消息发送到交换机,在有交换机发送到队列失败,才会执行
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println();
                System.out.println("ReturnCallback:     " + "消息:" + message);
                System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
                System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
                System.out.println("ReturnCallback:     " + "交换机:" + exchange);
                System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
                System.out.println();
            }
        });

        return rabbitTemplate;
    }
}
消费端手动确认
@Component
public class RabbimtMQListener3 {

    @RabbitListener(queues = "boot_queue")
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(message.toString());
//            int i=1/0;
            channel.basicAck(deliveryTag,true);  //手动确认
            System.out.println("手动确认");
        } catch (IOException e) {
            //拒绝签收
            //第三个参数,重回队列重新发送
            channel.basicNack(deliveryTag,true,true);
            System.out.println("拒绝签收");
        }

    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307265.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号