栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

05. RabbitMQ的消息持久化

05. RabbitMQ的消息持久化

05. RabbitMQ的消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?

消费者的ACK确认机制,可以防止消费者丢失消息万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失

想要将消息持久化, 必须将路由、队列和发送消息方法都要持久化才可以(所谓持久化,就是将消息写入到磁盘中。当由于mq服务器宕机,或者程序异常停止,重新启动依然可以读取到之前发送的消息)

1. 整合发布/订阅模式的持久化 生产者代码
package presistent;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import util.ConnectionUtil;


public class sender {
    public static void main(String[] args) throws Exception {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection();
        //2.在连接中建立信道
        Channel channel = connection.createChannel();
        //3.声明路由
        //参数1:路由名
        //参数2:路由键类型
        //参数3:是否持久化
        channel.exchangeDeclare("prisistent_exchange_test","fanout",true);
        //4.发送消息
        String msg = "hello";
        channel.basicPublish("prisistent_exchange_test","", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        System.out.println("发送的消息为:" + msg);
        channel.close();
        connection.close();

    }
}
消费者1代码
package presistent;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;


public class recer1 {
    public static void main(String[] args) throws Exception {
        //1.建立连接
        Connection connection = ConnectionUtil.getConnection();
        //2.在连接中建立信道
        Channel channel = connection.createChannel();
        //2.1声明队列
        channel.queueDeclare("prisistent_exchange_queuq1",true,false,false,null);
        //2.2通过队列绑定路由
        channel.queueBind("prisistent_exchange_queuq1","prisistent_exchange_test","");
        //3定义内部类实现handDclare接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("【消费者1】="+msg);
            }

        };

        //4.监听队列
        channel.basicConsume("prisistent_exchange_queuq1",true,consumer);
    }

}
消费者2代码
package presistent;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;


public class recer2 {
    public static void main(String[] args) throws Exception {
        //1.建立连接
        Connection connection = ConnectionUtil.getConnection();
        //2.在连接中建立信道
        Channel channel = connection.createChannel();
        //2.1声明队列
        channel.queueDeclare("prisistent_exchange_queuq2",true,false,false,null);
        //2.2通过队列绑定路由
        channel.queueBind("prisistent_exchange_queuq2","prisistent_exchange_test","");
        //3定义内部类实现handDclare接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("【消费者2】="+msg);
            }

        };

        //4.监听队列
        channel.basicConsume("prisistent_exchange_queuq2",true,consumer);
    }

}
姓名:程序员阿红
喜欢:Java编程
重要的事情说三遍!!!
欢迎大家关注
欢迎大家访问
欢迎大家收藏
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775665.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号