栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitMQ学习笔记

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitMQ学习笔记

rabbitMQ消息轮训
  • 抽取工具类
package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class RabbitMqUtils {
    //得到一个连接的channel
    public static Channel getChannel() throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂ip 链接mQ队列
        factory.setHost("124.220.3.19");
        //用户名
        factory.setUsername("root");
        //密码
        factory.setPassword("root");
        //创建链接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        return channel;
    }
}
消费者代码
package two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Worker01 {

    //队列的名称
    public static final String QUEUE_NAME = "hello";

    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //消息的接收
        //声明_接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("接收到的消息" + new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
        };
        
        System.out.println("C2_等待接收消息......");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

    }

}

生产者代码
package two;

import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class Task01 {
    //队列的名称
    public static final String QUEUE_NAME = "hello";

    //发送大量的消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //从控制台中接收信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }


    }


}

运行结果
  • 生产者发送消息

  • 消费者1

  • 消费者2

手动应答
  • 睡眠工具类
package utils;

public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  • 生产者类
package three;

import com.rabbitmq.client.Channel;
import utils.RabbitMqUtils;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


public class Task02 {
    //队列的名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //发送大量的消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        //从控制台中接收信息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("发送消息完成:" + message);
        }
    }
}

  • 消费者1
package three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
import utils.SleepUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker03 {
    //队列的名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1_等待接收消息_时间较短......");
        
        //消息的接收
        //声明_接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            SleepUtils.sleep(1);
            System.out.println("接收到的消息" + new String(message.getBody()));
            //手动应答
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

  • 消费者2
package three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMqUtils;
import utils.SleepUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Worker04 {
    //队列的名称
    public static final String TASK_QUEUE_NAME = "ack_queue";
    //接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2_等待接收消息_时间较长......");
        
        //消息的接收
        //声明_接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {

            SleepUtils.sleep(30);
            System.out.println("接收到的消息" + new String(message.getBody()));
            //手动应答
            
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag -> {
            System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

  • 运行结果


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/1039588.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号