栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Java-RabbitMq

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Java-RabbitMq

一:RabbitMq初始连接
1.创建maven工程在pom中导入

    
        
            com.rabbitmq
            amqp-client
            3.6.5
        
    

1.创建生成者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {
    //队列名称
    private final static String QUEUE_NAME = "rabbitmq-test";

    public static void main(String[] argv) throws Exception {

        //1.创建一个ConnectionFactory连接工厂connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.通过connectionFactory设置RabbitMQ所在IP等信息
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672); //指定端口
        connectionFactory.setUsername("guest");//用户名
        connectionFactory.setPassword("guest");//密码
        //3.通过connectionFactory创建一个连接connection
        Connection connection = connectionFactory.newConnection();
        //4.通过connection创建一个频道channel
        Channel channel = connection.createChannel();
        //5.通过channel指定一个队列
        
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送的消息
        String message = "hello world!";
        //6.通过channel向队列中添加消息,第一个参数是转发器,使用空的转发器(默认的转发器,类型是direct)
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("向" + QUEUE_NAME + "中添加了一条消息:" + message);
        //7.关闭频道
        channel.close();
        //8.关闭连接
        connection.close();
    }
}

2.创建消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;



public class Customer {
    //队列名称
    private final static String QUEUE_NAME = "rabbitmq-test";

    public static void main(String[] argv) throws Exception {
        //1.创建一个ConnectionFactory连接工厂connectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.通过connectionFactory设置RabbitMQ所在IP等信息
        connectionFactory.setHost("127.0.0.1");
        //3.通过connectionFactory创建一个连接connection
        Connection connection = connectionFactory.newConnection();
        //4.通过connection创建一个频道channel
        Channel channel = connection.createChannel();
        //5.通过channel指定队列

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //与发送消息不同的地方
        //6.创建一个消费者队列consumer,并指定channel
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //7.为channel指定消费者
        
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true) {
            //从consumer中获取队列中的消息,nextDelivery是一个阻塞方法,如果队列中无内容,则等待
            Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("接收到了" + QUEUE_NAME + "中的消息:" + message);
        }

    }
}

二.自动应答与手动应答
此配置在消费端进行配置
Channel.basicAck(用于肯定确认)
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认)

手动应答的优势:可以批量应答并且减少网络拥堵

public class Customer1 {
    //队列名称
    private final static String QUEUE_NAME = "rabbitmq-ack";

    public static void main(String[] args) throws Exception {
        Channel channel = ConnectionMq.getChannel();
        System.out.println("Customer1 等待接收消息处理时间较短");
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:" + message);
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
//        //不公平分发
//        int perfetchCount = 1;
        //预期值
        int perfetchCount = 2;
        channel.basicQos(perfetchCount);
        //采用手动应答
        boolean autoAck = false;
        String s = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}


并且在channel.basicConsume()配置手动应答

三.不公平分发与预期值

总结要做到持久化需要做的以下几点

1.队列持久化(服务端)

2.消息持久化(服务端)

3.发布确认(消费端)
包含:单个确认,批量确认,异步批量确认

    //批量个数
    public static final int count=10000;
    //单个确认
    public static void publishMessageIndividually() throws Exception {
        try (Channel channel = ConnectionMq.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认
            channel./confirm/iSelect();
            long begin = System.currentTimeMillis();
            for (int i = 0; i < count; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                //服务端返回 false 或超时时间内未返回,生产者可以消息重发
                boolean flag = channel.waitFor/confirm/is();
                if(flag){
                    System.out.println("消息发送成功");
                }
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + count + "个单独确认消息,耗时" + (end - begin) +
                    "ms");
        }
    }
    
    //批量确认
    public static void publishMessageBatch() throws Exception {
        try (Channel channel = ConnectionMq.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认
            channel./confirm/iSelect();
            //批量确认消息大小
            int batchSize = 100;
            //未确认消息个数
            int outstandingMessageCount = 0;
            long begin = System.currentTimeMillis();
            for (int i = 0; i < count; i++) {
                String message = i + "";
                channel.basicPublish("", queueName, null, message.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize) {
                    channel.waitFor/confirm/is();
                    outstandingMessageCount = 0;
                }
            }
            //为了确保还有剩余没有确认消息 再次确认
            if (outstandingMessageCount > 0) {
                channel.waitFor/confirm/is();
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + count + "个批量确认消息,耗时" + (end - begin) +
                    "ms");
        }
    }
    
    //异步批量确认
    public static void publishMessageAsync() throws Exception {
        try (Channel channel = ConnectionMq.getChannel()) {
            String queueName = UUID.randomUUID().toString();
            channel.queueDeclare(queueName, false, false, false, null);
            //开启发布确认
            channel./confirm/iSelect();
            
            ConcurrentSkipListMap outstandingConfirms = new
                    ConcurrentSkipListMap<>();
            
            /confirm/iCallback ackCallback = (sequenceNumber, multiple) -> {
                if (multiple) {
                    //返回的是小于等于当前序列号的未确认消息 是一个 map
                    ConcurrentNavigableMap confirmed =
                            outstanding/confirm/is.headMap(sequenceNumber, true);
                    //清除该部分未确认消息
                    /confirm/ied.clear();
                }else{
                    //只清除当前序列号的消息
                    outstanding/confirm/is.remove(sequenceNumber);
                }
            };
            /confirm/iCallback nackCallback = (sequenceNumber, multiple) -> {
                String message = outstanding/confirm/is.get(sequenceNumber);
                System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
            };
            
            channel.add/confirm/iListener(ackCallback, null);
            long begin = System.currentTimeMillis();
            for (int i = 0; i < count; i++) {
                String message = "消息" + i;
                
                outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
                channel.basicPublish("", queueName, null, message.getBytes());
            }
            long end = System.currentTimeMillis();
            System.out.println("发布" + count + "个异步确认消息,耗时" + (end - begin) + "ms");
        } 
    }

链接: 不定时进行更新

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/270512.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号