栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ死信队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ死信队列

死信队列原理图:

 Producer.java

package com.zheng.eight;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;

public class Producer {
    public static final String NORMAL_EXCHANGE= "normal_exchange";
    //普通交换机的名称
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //死信消息 设置ttl时间
        for (int i = 0; i < 11; i++) {
            String message = "info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());

        }
    }
}

Consumer1.java

package com.zheng.eight;

import com.rabbitmq.client.*;
import com.zheng.utils.RabbitMqUtil;

import java.util.HashMap;
import java.util.Map;
//死信
//消费者1
public class Consumer01 {
    //普通的交换机
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        
        Map arguments = new HashMap<>() ;
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信Routingkey
        arguments.put("x-dead-letter-routing-key","lisi");
        //过期时间 毫秒
        arguments.put("x-message-ttl",100000);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,NORMAL_EXCHANGE,"lisi");
        channel.queueDeclare();
        DeliverCallback deliverCallback= (consumerTag,message)->{
            System.out.println("Consumer01接收的消息是"+new String(message.getBody(),"utf-8"));
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("未确认消息");
        };
        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback,null);

    }
}

Consumer2.java

package com.zheng.eight;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;

public class Consumer02 {
//消费者2
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("等待接收消息");
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"utf-8"
            ));
        };
        CancelCallback cancelCallback = (consumerTag)->{

        };
        channel. basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback,null);
    }
}

要想懂代码,必须先看懂原理图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/345181.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号